source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 6819

Last change on this file since 6819 was 6819, checked in by mabarracus, 19 months ago

Fix test suite
Consolidation daemon ported to python3
Avoid daemon running with multiple instances
Fixes indent,sizes in fonts and graphics
System stats visualization page
Minimized javascript files
New cache for visualization graphs minimizing overloading and DoS attacks
Improved scheduler with clients sending 10+ results
Support to provide blacklist
Support to platform data

  • Property svn:executable set to *
File size: 36.4 KB
Line 
1#!/usr/bin/env python3
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7#import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21import logging
22import logging.handlers
23from logging import config
24from functools import reduce
25import traceback
26
27##### START EDITABLE VARS #####
28
29DEBUG=False
30MIN_LOG_LEVEL=logging.INFO
31DAEMON=True
32FILE='/usr/lib/analytics-server/analytics/config.php'
33LIMIT=70.0       # PERCENTAGE LOAD LIMIT
34
35##### END EDITABLE VARS #####
36
37DEBUG_PRINT_ALIVE_COUNTER=60
38THREADED=True
39MAX_SLOW_CHECK_CLIENTS = 30
40
41if DEBUG:
42    if MIN_LOG_LEVEL:
43        loglevel=MIN_LOG_LEVEL
44    else:
45        loglevel=logging.DEBUG
46else:
47    loglevel=logging.INFO
48
49LOGGING = {
50    'version': 1,
51    'disable_existing_loggers': False,
52    'formatters': {
53        'verbose': {
54            'format': '%(levelname)s %(module)s %(message)s'
55            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
56            },
57        },
58    'handlers': {
59        'stdout': {
60            'class': 'logging.StreamHandler',
61            'stream': sys.stdout,
62            'formatter': 'verbose',
63            },
64        'sys-logger6': {
65            'class': 'logging.handlers.SysLogHandler',
66            'address': '/dev/log',
67            'facility': "local6",
68            'formatter': 'verbose',
69            },
70        },
71    'loggers': {
72        'analyticsd-logger': {
73            'handlers': ['sys-logger6','stdout'],
74            'level': loglevel,
75            'propagate': True,
76            },
77        }
78    }
79
80def to_str(x):
81    if isinstance(x,str):
82        return to_utf(x).decode('unicode_escape')
83    else:
84        return x
85
86def to_utf(x):
87    if isinstance(x,str):
88        return x.encode('utf-8')
89    else:
90        return x
91
92def printea(msg="",level='critical'):
93    if level == 'critical':
94        logger.critical(msg)
95    elif level == 'error':
96        logger.error(msg)
97    elif level == 'warning':
98        logger.warning(msg)
99    elif level == 'info':
100        logger.info(msg)
101    else:
102        logger.debug(msg)
103
104def keepalive(who=""):
105    global DEBUG_PRINT_ALIVE_COUNTER
106    t=str(int(time.time()))
107    if DEBUG:
108        printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
109    else:
110        if who == 'main':
111            if DEBUG_PRINT_ALIVE_COUNTER > 0:
112                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
113            else:
114                DEBUG_PRINT_ALIVE_COUNTER=60
115                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
116
117    fp = open('/var/run/analyticsd.keepalive','w');
118    fp.write(t)
119    fp.close()
120
121config.dictConfig(LOGGING)
122logger = logging.getLogger('analyticsd-logger')
123
124DBNAME=None
125USER=None
126PASS=None
127IP=None
128
129EMPTY_PAUSED_SLEEP=1
130CHECK_LOAD_TIME=5
131MAX_RETRIES=5
132TIMED_ON=False
133
134try:
135    with open(FILE,'r') as f:
136        for line in f:
137            if not IP:
138                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
139                if IP:
140                    IP = IP.group(1)
141            if not DBNAME:
142                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
143                if DBNAME:
144                    DBNAME = DBNAME.group(1)
145            if not USER:
146                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
147                if USER:
148                    USER = USER.group(1)
149            if not PASS:
150                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
151                if PASS:
152                    PASS = PASS.group(1)
153    if not (IP or DBNAME or USER or PASS):
154        printea("Couldn't get database configuration from {}".format(FILE))
155    else:
156        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
157except Exception as e:
158    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
159    sys.exit(1)
160
161if not (IP or DBNAME or USER or PASS):
162    printea("Couldn't get database configuration from {}".format(FILE))
163    sys.exit(1)
164
165if THREADED:
166    THREADS=['main','server','client','desktop','other']
167else:
168    THREADS=['main']
169
170class DB():
171    def __init__(self,mon,t='main'):
172        self.t=t
173        self.reconnect=0
174        self.empty = False
175        self.conn=None
176        self.mon=mon
177        self.q=queue.Queue()
178        self.processed=0
179        self.need_clean = False
180        printea('Database worker {} initialized'.format(t),'info')
181
182    def timed(func):
183        @wraps(func)
184        def wrapper(*args,**kwargs):
185            if TIMED_ON:
186                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
187            ret=func(*args,**kwargs)
188            if TIMED_ON:
189                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
190            return ret
191        return wrapper
192
193    def with_retry(func):
194        @wraps(func)
195        def wrapper(*args,**kwargs):
196            if 'retry' not in kwargs:
197                kwargs['retry']=1
198            if 'mon' not in kwargs:
199                kwargs['mon'] = None
200            try:
201                return func(*args,**kwargs) 
202            except Exception as e:
203                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
204                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
205                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
206                    if kwargs['mon']:
207                        kwargs['mon'].term()
208                        sys.exit(1)
209                    return None
210                else:
211                    time.sleep(kwargs['retry']**2)
212                    kwargs['retry']+=1
213                    return wrapper(*args,**kwargs)
214            return result
215        return wrapper
216
217    def with_debug(func):
218        @wraps(func)
219        def wrapper(*args,**kwargs):
220            if 'query' in kwargs:
221                printea("executing query: {}".format(kwargs['query']),'debug')
222            return func(*args,**kwargs)
223        return wrapper
224
225    @with_debug
226    def execute(self,*args,**kwargs):
227        if 'query' not in kwargs:
228            printea("Warning execute called whithout query",'info')
229            return None
230        try:
231            return self.cur.execute(kwargs['query'])
232        except Exception as e:
233            raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
234
235    def init_db(self):
236        try:
237            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
238            self.conn.autocommit(False)
239            self.cur = self.conn.cursor()
240            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
241            variables=self.check_server_variables()
242
243            if variables:
244                self.mon.server_variables=variables
245
246            printea("Connected succesfully {}".format(self.t),'info')
247
248        except mdb.Error as e:
249            printea("Error {}: {}".format(e.args[0],e.args[1]))
250            raise Exception(e)
251
252    def check_server_variables(self):
253        if self.t != 'main':
254            return None
255        else:
256            printea('Getting server variables','info')
257            result = {}
258            self.cur.execute("show global variables")
259            printea('Setting {} vars'.format(self.cur.rowcount),'debug')
260            if self.cur.rowcount > 0:
261                for i in range(self.cur.rowcount):
262                    var_name, var_value = self.cur.fetchone()
263                    result.setdefault(var_name,var_value)
264            return result
265
266    def get_config(self):
267        values={}
268        self.cur.execute('select name,value from Config')
269        for val in self.cur.fetchall():
270            values.setdefault(val[0],val[1])
271        return values
272
273    def put_config(self,values):
274        vals=[]
275        for x in values.keys():
276            vals.append("('{}','{}')".format(x,values[x]))
277        try:
278            self.cur.execute("insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals)))
279            self.conn.commit()
280        except Exception as e:
281            print(e)
282
283    def check_temporary_tables_size(self,table_name):
284        if self.t != 'load':
285            return None
286        else:
287            size = 0
288            rows = 0
289            self.cur.execute('select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
290
291            res=self.cur.fetchone()
292            size = float(res[0])
293            rows = int(res[1])
294            return (int(size),rows)
295
296    def close_db(self):
297        if self.conn:
298            self.conn.close()
299            printea("Closed connection {}".format(self.t),'info')
300
301    def reduce_flavour(self,version,flavour):
302        if version == '15':
303            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
304                return 'desktop'
305            elif 'server' in flavour:
306                return 'server'
307            elif 'client' in flavour:
308                return 'client' 
309        elif version == '16':
310            if 'server' in flavour:
311                return 'server'
312            elif 'client' in flavour:
313                return 'client'
314            elif 'desktop' in flavour:
315                return 'desktop'
316        return 'other'
317
318    def reduce_version(self,version):
319        if version[0:2] in ['15','16']:
320            return version[0:2]
321        else:
322            return 'other'
323
324    def gen_uuid(self,*args,**kwargs):
325        try:
326            string=to_utf(u'-'.join([str(x) for x in args]))
327            h=hashlib.sha1(string)
328            return int(h.hexdigest()[0:16],16)
329        except Exception as e:
330            print(traceback.format_exc())
331
332    @timed
333    @with_retry
334    def get_client(self,*args,**kwargs):
335        try:
336            query="SELECT id,date,user,version,sabor,arch,mem,vga,cpu,ncpu from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window))
337            self.execute(query=query)
338            ret =[]
339            if self.cur.rowcount > 0:
340                for i in range(self.cur.rowcount):
341                    v_id,v_date,v_user,v_version,v_flavour,v_arch,v_mem,v_vga,v_cpu,v_ncpu=self.cur.fetchone()
342                    version=self.reduce_version(v_version)
343                    flavour=self.reduce_flavour(version,v_flavour)
344                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
345                    if not v_arch:
346                        v_arch = 'NULL'
347                    if not v_mem:
348                        v_mem = 'NULL'
349                    if not v_vga:
350                        v_vga = 'NULL'
351                    if not v_cpu:
352                        v_cpu = 'NULL'
353                    if not v_ncpu:
354                        v_ncpu = 'NULL'
355                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour,'arch':v_arch,'mem':v_mem,'vga':v_vga,'cpu':v_cpu,'ncpu':v_ncpu})
356                return ret
357            else:
358                return True
359        except Exception as e:
360            raise Exception("Error getting client: {}".format(e))
361
362    @timed
363    @with_retry
364    def get_apps(self,*args,**kwargs):
365        if 'clients' not in kwargs:
366            printea("Warning executed without named parameter clients",'info')
367            return None
368        ret = {}
369        try:
370            cids_to_rel_fla={}
371            for c in kwargs['clients']:
372                if c['id'] not in cids_to_rel_fla:
373                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
374                if c['flavour'] not in ret:
375                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
376            for c in kwargs['clients']:
377                ret[c['flavour']]['clients'].append(c)
378
379            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
380            self.execute(query=query)
381            if self.cur.rowcount > 0:
382                for i in range(self.cur.rowcount):
383                    row = self.cur.fetchone()
384                    clid = row[0]
385                    rel,fla = cids_to_rel_fla[clid]
386                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
387                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
388            return ret
389        except Exception as e:
390            raise Exception("Error getting apps: {}".format(e))
391
392    @timed
393    @with_retry
394    def put_client(self,*args,**kwargs):
395        if 'client_list' not in kwargs:
396            raise Exception("Called without named parameter client_list")
397        values= []
398        for cli in kwargs['client_list']:
399            values.append("({},'{}','{}','{}','{}','{}','{}','{}',{},'{}','{}',{})".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour'],cli['arch'],cli['mem'],cli['vga'],cli['cpu'],cli['ncpu']))
400        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`,`arch`,`mem`,`vga`,`cpu`,`ncpu`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
401        self.execute(query=query)
402        return True
403
404    @timed
405    @with_retry
406    def put_apps(self,*args,**kwargs):
407        if 'apps' not in kwargs:
408            raise Exception("Called without named parameter apps")
409        app_list = {}
410        for app in kwargs['apps']:
411            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
412            if str(app['uuid']) not in app_list:
413                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
414            else:
415                app_list[str(app['uuid'])]['value']+=app['value']
416        values = []
417        for app in app_list:
418            item=app_list[app]
419            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
420        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
421        self.execute(query=query)
422        return True
423
424    @timed
425    @with_retry
426    def del_client(self,*args,**kwargs):
427        if 'client_list' not in kwargs:
428            raise Exception("Called without named parameter client_list")
429        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
430        self.execute(query=query)
431        return True
432
433    @timed
434    @with_retry
435    def del_apps(self,*args,**kwargs):
436        if 'client_list' not in kwargs:
437            raise Exception("Called without named parameter client_list")
438        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
439        self.execute(query=query)
440        return True
441
442    @timed
443    def reset_autoinc(self):
444        try:
445            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME)
446            self.execute(query=query)
447            ainc = self.cur.fetchone()[0]
448            #query = "SELECT count(*) from tmp_clients"
449            #self.execute(query=query)
450            #cli_size=self.cur.fetchone()[0]
451            #query = "SELECT count(*) from tmp_packages"
452            #self.execute(query=query)
453            #pkg_size=self.cur.fetchone()[0]
454            #printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
455            if ainc > 65500 and self.mon.all_queues_empty():
456            #and cli_size == 0 and pkg_size == 0:
457                query = "TRUNCATE TABLE tmp_clients"
458                self.execute(query=query)
459                query = "TRUNCATE TABLE tmp_packages"
460                self.execute(query=query)
461            return True
462        except Exception as e:
463            raise Exception("Error reseting auto_increment: {}".format(e))
464
465
466    @timed
467    def process_main_thread(self,*args,**kwargs):
468        if self.mon.slow_check_clients > 0 :
469            self.mon.slow_check_clients -= 1
470            time.sleep(1)
471            return True
472        else:
473            self.mon.slow_check_clients = self.mon.default_slow_check_clients
474
475        clis=self.get_client(mon=self.mon)
476        if clis == True: #No clients found (empty)
477            self.empty = True # if > 65500 auto_increment was reset
478            self.mon.schedule(event='NO_MORE_CLIENTS')
479            return False
480        #clients found
481        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
482        # if clients found get apps
483        lapps = self.get_apps(clients=clis,mon=self.mon)
484        #lapps can be empty
485        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
486        #If deletion was failed , thread was died
487        lapps_tmp={'apps':[],'clients':[]}
488        for fla in lapps:
489            if THREADED:
490                lapps[fla]['timestamp']=time.time()
491                self.mon.db[fla].q.put(lapps[fla],True)
492            else:
493                lapps_tmp['apps'].extend(lapps[fla]['apps'])
494                lapps_tmp['clients'].extend(lapps[fla]['clients'])
495                self.mon.db['main'].q.put(lapps_tmp,True)
496        #if DEBUG:
497        self.processed+=len(clis)
498        return True
499
500    @timed
501    def process_all_threads(self,*args,**kwargs):
502        lapps=self.q.get(True)
503        #print "Running {}".format(self.t)
504        if THREADED:
505            while (lapps['timestamp'] > self.mon.commited):
506                time.sleep(0.001)
507        if len(lapps['apps']) != 0 and len(lapps['clients']) != 0:
508            #IF FAIL, AFTER RETRIES THREAD DIES
509            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
510                self.q.put(lapps,True) # USELESS
511                return False    # USELESS
512            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
513                self.q.put(lapps,True) # USELESS
514                return False    # USELESS
515            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
516                self.q.put(lapps,True) # USELESS
517                return False    # USELESS
518        #if DEBUG:
519        self.processed+=len(lapps['clients'])
520        return True
521
522    def process(self,*args,**kwargs):
523        keepalive(self.t)
524        # code only for main thread
525        printea("Running thread {}".format(self.t),'debug')
526        if self.t == 'main' and not self.mon.terminate:
527            ret=self.process_main_thread(*args,**kwargs)
528            if ret == False: #No more clients available
529                return True
530            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
531                if THREADED:
532                    return True #Need return to avoid put empty flag and wait into main thread
533
534        #after this poing, code for all threads
535        if not self.q.empty():
536            ret=self.process_all_threads(*args,**kwargs)
537            if ret == False: # USELESS? THREADS was died
538                printea("Error threads")
539                return ret
540        else: 
541            del self.q
542            self.q = queue.Queue()
543            self.empty = True
544        return True
545
546    def worker(self):
547        printea("Starting worker {} processing".format(self.t),'info')
548        while not (self.mon.terminate and self.empty):
549            if self.mon.paused or self.empty:
550                #if self.empty and self.t == 'main':
551                #    self.reset_autoinc()
552                if self.mon.paused:
553                    printea("Paused by high load {}".format(self.t),'debug')
554                if self.empty:
555                    printea("Empty queue {} sleeping by now".format(self.t),'debug')
556                if self.empty:
557                    self.empty = False
558                time.sleep(EMPTY_PAUSED_SLEEP)
559            else:
560                try:
561                    if self.conn == None:
562                        raise EnvironmentError('Connection not available, probably it\'s a first run')
563                    else:
564                        self.conn.begin()
565                    if self.process():
566                        self.conn.commit()
567                        self.mon.commited=time.time()
568                        self.reconnect = 0
569                        if self.need_clean:
570                            gc.collect()
571                    else:
572                        self.conn.rollback()
573                except EnvironmentError as e:
574                    printea(e)
575                    self.init_db()
576
577                except Exception as e:
578                    try:
579                        if self.conn != None:
580                            self.conn.rollback()
581                    except:
582                        printea("Can't rollback last actions",'info')
583                        pass
584                    #if e[0] != 2006:
585                    #    printea("Exception processing worker({}): {}".format(self.t,e))
586                    #if e[0] == 2006: # SERVER GONE AWAY
587                    #    printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))
588
589                    printea("Trying to recover connection ({})".format(self.t))
590                    if self.reconnect == 100:
591                        printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t))
592                        self.mon.term()
593                    else:
594                        self.reconnect+=1
595                        printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
596                        time.sleep(self.reconnect*self.reconnect)
597
598                        try:
599                            self.init_db()
600                            printea("Recovered worker {} connection".format(self.t),'info')
601                        except:
602                            printea('Unable to initialize worker {}'.format(self.t))
603                            pass
604
605
606class Monitor():
607    def __init__(self):
608        self.MAX_QUEUE_UTILIZATION = 100
609        self.USE_MAX_QUEUES = True
610        self.MAX_SELECT_WINDOW = (2 ** 13) +1
611        self.MIN_SELECT_WINDOW = 32
612        self.MEM_USED=0
613        self.MAX_MEM=512
614        self.MIN_FREE_MEM_SERVER=100
615        self.USE_MAX_MEM=True
616        self.lock = Lock()
617        self.terminate = False
618        self.finished = False
619        self.paused = False
620        self.select_window = self.MIN_SELECT_WINDOW
621        self.commited = time.time()
622        self.procesed = 0
623        self.procesed_per_sec = [0]*10
624        self.load = 0
625        self.server_variables = None  # initialized by main worker
626        self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
627        self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
628        self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
629        self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
630        self.default_slow_check_clients = 1
631        self.slow_check_clients = self.default_slow_check_clients
632        self.server_mem = 0
633        self.loadlist = [ 0.0 ] * 100
634        self.max_heap = None
635
636        signal.signal(signal.SIGQUIT,self.term)
637        signal.signal(signal.SIGTERM,self.term)
638        signal.signal(signal.SIGINT,self.term)
639
640        self.db = {}
641        self.threads = {}
642        for x in THREADS:
643            self.db[x] = DB(self,x)
644            #try:
645            #    self.db[x].init_db()
646            #except Exception as e:
647            #    printea('Error initializing database connections: {}'.format(str(e)))
648            #    sys.exit(1)
649        self.cfg= None
650        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
651
652    def windowctl(self, *args, **kwargs):
653        if args[0] == '+':
654            if self.select_window*2 < self.MAX_SELECT_WINDOW:
655                self.select_window*=2
656                self.select_window=int(self.select_window)
657        if args[0] == '-':
658            if self.select_window > self.MIN_SELECT_WINDOW:
659                self.select_window/=2
660                self.select_window=int(self.select_window)
661
662    def slowcheckctl(self, *args, **kwargs):
663        if args[0] == 'reset':
664            self.default_slow_check_clients = 0
665        if args[0] == '+':
666            if self.default_slow_check_clients < MAX_SLOW_CHECK_CLIENTS:
667                if self.default_slow_check_clients == 0:
668                    self.default_slow_check_clients = 1
669                else:
670                    self.default_slow_check_clients = self.default_slow_check_clients * 2
671
672
673    def schedule(self, *args, **kwargs):
674        if kwargs['event'] == 'NO_MORE_CLIENTS':
675            self.windowctl('-')
676            self.slowcheckctl('+')
677
678        if kwargs['event'] == 'HAVE_CLIENTS':
679
680            if kwargs['nclients'] == self.select_window:
681                self.windowctl('+')
682            else:
683                self.windowctl('-')
684            self.slowcheckctl('reset')
685#
686#                if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
687#                    self.windowctl('+')
688#
689#                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
690#                    self.windowctl('-')
691#                    self.slowcheckctl('+')
692#                else:
693#                    self.windowctl('+')
694#
695#            elif kwargs['nclients'] < self.select_window/2:
696#                self.windowctl('-')
697#
698#            self.slowcheckctl('reset')
699
700        if kwargs['event']=='CHECK_SANITY':
701            # CPU SETTINGS
702            if not self.terminate and self.load > LIMIT:
703                self.paused = True
704            else:
705                self.paused = False
706
707            # DB MEM SETTINGS
708            if self.max_heap and self.temporary_tables_size['sum']:
709                if self.temporary_tables_size['sum'] > self.max_heap * 0.4:
710                    self.windowctl('+')
711                    if self.paused:
712                        #printea('Hitting max temporary table size unpausing','critical')
713                        self.paused = False
714            # SERVER MEM
715            if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER:
716                #printea('Hitting max memory from server collecting and reducing window','critical')
717                self.windowctl('-')
718                self.slowcheckctl('+')
719                self.USE_MAX_QUEUES=True
720                for x in THREADS:
721                    self.db[x].need_clean=True
722                gc.collect()
723            else:
724#                self.USE_MAX_QUEUES=False
725                for x in THREADS:
726                    self.db[x].need_clean=False
727
728            if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
729                self.windowctl('-')
730                self.slowcheckctl('+')
731
732            # QUEUES
733            if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
734                self.windowctl('+')
735
736
737    def get_cpu_load(self):
738        self.loadlist.append(psutil.cpu_percent())
739        self.loadlist=self.loadlist[1:]
740        avg=0.0
741        for x in self.loadlist:
742            avg+=x
743        return round(avg/100.0,2)
744
745    def put_config(self,key='',value=''):
746        pass
747
748    def term(self,*args,**kwargs):
749        printea("Begin kill the program, wait please...",'info')
750        self.terminate=True
751
752    def prepare_threads(self):
753        global DAEMON
754
755        self.threads['load']=Process(target=self.get_load)
756        self.threads['load'].daemon = DAEMON
757        self.threads['load'].start()
758        for x in THREADS:
759            self.threads[x]=Process(target=self.db[x].worker)
760            self.threads[x].daemon = DAEMON
761            self.threads[x].start()
762
763    def get_mem_usage(self):
764        process = psutil.Process(os.getpid())
765        mem = process.memory_info()[0] / float(2 ** 20)
766        return mem
767
768    def get_server_free_mem(self):
769        mem = psutil.virtual_memory()
770        return mem.free / (2 ** 20)
771
772    def print_stats(self):
773        global CHECK_LOAD_TIME
774#        if DEBUG:
775        if True:
776            out="Processed: "
777            out2=''
778            total=0
779            for x in THREADS:
780                out2+='{}={} '.format(x,self.db[x].processed)
781                self.cfg.store('processed '+x,self.db[x].processed)
782                if THREADED:
783                    if x != 'main':
784                        total += self.db[x].processed
785                else:
786                    total += self.db[x].processed
787            out += "TOTAL={} {}".format(total,out2)
788            self.cfg.store('processed total',total)
789            proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME)
790            self.procesed_per_sec.append(proc_per_sec)
791            self.procesed_per_sec=self.procesed_per_sec[-10:]
792            f=lambda x,y:x+y
793            suma_proc=reduce(f,self.procesed_per_sec)
794            self.cfg.store('processing at',int(suma_proc/10))
795            self.procesed=total
796            total = 0
797            if THREADED:
798                out+="Queues: "
799                sizes=self.get_q_sizes()
800                out2=''
801                for x in sizes:
802                    self.cfg.store('queue '+x,sizes[x])
803                    out2+='{}={} '.format(x,sizes[x])
804                    total += sizes[x]
805                self.cfg.store('queue totals',total)
806                out+="TOTAL={} {}".format(total,out2)
807            self.cfg.store('select window',self.select_window)
808            self.cfg.store('mem used',self.MEM_USED)
809            self.cfg.store('load',self.load)
810            if (self.paused):
811                self.cfg.store('paused',1)
812            else:
813                self.cfg.store('paused',0)
814            self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients']))
815            self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages']))
816            self.cfg.store('db_clients_size',int(self.db_tables_size['clients']))
817            self.cfg.store('db_packages_size',int(self.db_tables_size['packages']))
818            if DEBUG:
819                if THREADED:
820                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
821                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
822                else:
823                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
824                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
825
826    def get_q_sizes(self):
827        sizes={}
828        for x in THREADS:
829            sizes[x]=self.db[x].q.qsize()
830        return sizes
831
832    def sum_q_sizes(self):
833        sizes=self.get_q_sizes()
834        f=lambda x,y: x+y
835        return reduce(f,[sizes[x] for x in sizes])
836
837    def all_queues_empty(self):
838        sizes=self.get_q_sizes()
839        for x in sizes:
840            if sizes[x] != 0:
841                return False
842        return True
843
844    def get_load(self):
845        # runs on separate thread
846        db = DB(self,'load')
847        db.init_db()
848        self.cfg = Config(db)
849        ctime=0
850        while not (self.terminate and self.finished): #and not self.paused
851            self.cfg.store('keepalive',int(time.time()))
852            time.sleep(1)
853            self.load=self.get_cpu_load()
854            ctime+=1
855            self.schedule(event='CHECK_SANITY')
856            if ctime >CHECK_LOAD_TIME:
857                self.cfg.write()
858                ctime=0.0
859
860                self.MEM_USED=self.get_mem_usage()
861                self.server_mem=self.get_server_free_mem()
862
863                if self.server_variables and 'max_heap_table_size' in self.server_variables:
864                    self.max_heap=int(self.server_variables['max_heap_table_size'])/(2**20)
865
866                self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size('tmp_clients')
867                self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size('tmp_packages')
868                self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size('Client_Versions')
869                self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size('RecvPackages')
870                self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages']
871                self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages']
872                self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages']
873                self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages']
874                qempty = self.all_queues_empty()
875                if self.temporary_tables_rows['clients'] == 0 and self.temporary_tables_rows['packages'] == 0 and qempty:
876                    db.reset_autoinc()
877                self.print_stats()
878                #self.load=os.getloadavg()[0]
879                if qempty:
880                    gc.collect()
881            #end if
882        #end while
883        db.close_db()
884
885    def start(self):
886        self.prepare_threads()
887
888    def end(self):
889        for x in self.db:
890            self.threads[x].join()
891            self.db[x].close_db()
892        self.finished = True
893        self.print_stats()
894
895
896class Config():
897    def __init__(self,connection):
898        self._db = connection
899        self.read()
900
901    def store(self,var,value):
902        var=var.replace(' ','_')
903        if isinstance(value,str):
904            if value.isnumeric():
905                setattr(self,var,int(value))
906            else:
907                setattr(self,var,str(value))
908        else:
909            setattr(self,var,int(value))
910
911    def write(self):
912        if self._db:
913            values={}
914            for x in self.get_internal_vars():
915                values.setdefault(str(x),str(getattr(self,x)))
916            self._db.put_config(values)
917
918    def read(self):
919        if self._db:
920            config=self._db.get_config()
921            for key in config.keys():
922                if config[key].isnumeric():
923                    setattr(self,key,int(config[key]))
924                else:
925                    setattr(self,key,config[key])
926        else:
927            print('No config yet')
928
929    def get_internal_vars(self):
930        return list(filter(lambda x : x[0] != '_',self.__dict__.keys()))
931
932    def print(self):
933        for v in self.get_internal_vars():
934            print('{} = {}'.format(v,getattr(self,v)))
935
936
937def main(*args,**kwargs):
938    gc.enable()
939    if DAEMON:
940        fp = open('/var/run/analyticsd.pid','w');
941        fp.write(str(os.getpid()))
942        fp.close()
943    m = Monitor()
944    m.start()
945    printea("start done",'info')
946    while not m.terminate:
947        time.sleep(0.5)
948    m.end()
949    printea("Exitting...",'info')
950
951if __name__ == "__main__":
952    size=len([ p.name() for p in psutil.process_iter() if p.name() == 'analyticsd' ])
953    if size > 1:
954        printea("Another daemon running... exitting now!")
955        sys.exit(1)
956    lck = '/var/run/analyticsd.lock'
957    if DAEMON:
958        if os.path.isfile(lck):
959            printea('Lockfile {} detected, unable to start'.format(lck),'error')
960            sys.exit(1)
961        else:
962            try:
963                with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock(lck),files_preserve=[logger.handlers[0].socket.fileno()]):
964                    main()
965            except Exception as e:
966                printea(e)
967                sys.exit(1)
968    else:
969        main()
Note: See TracBrowser for help on using the repository browser.