source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 6926

Last change on this file since 6926 was 6926, checked in by mabarracus, 2 years ago

Fix bugs

  • Property svn:executable set to *
File size: 36.9 KB
Line 
1#!/usr/bin/env python3
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7#import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21import logging
22import logging.handlers
23from logging import config
24from functools import reduce
25import traceback
26
27##### START EDITABLE VARS #####
28
29DEBUG=False
30MIN_LOG_LEVEL=logging.INFO
31DAEMON=True
32FILE='/usr/lib/analytics-server/analytics/config.php'
33LIMIT=70.0       # PERCENTAGE LOAD LIMIT
34
35##### END EDITABLE VARS #####
36
37DEBUG_PRINT_ALIVE_COUNTER=60
38THREADED=True
39MAX_SLOW_CHECK_CLIENTS = 30
40
41if DEBUG:
42    if MIN_LOG_LEVEL:
43        loglevel=MIN_LOG_LEVEL
44    else:
45        loglevel=logging.DEBUG
46else:
47    loglevel=logging.INFO
48
49LOGGING = {
50    'version': 1,
51    'disable_existing_loggers': False,
52    'formatters': {
53        'verbose': {
54            'format': '%(levelname)s %(module)s %(message)s'
55            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
56            },
57        },
58    'handlers': {
59        'stdout': {
60            'class': 'logging.StreamHandler',
61            'stream': sys.stdout,
62            'formatter': 'verbose',
63            },
64        'sys-logger6': {
65            'class': 'logging.handlers.SysLogHandler',
66            'address': '/dev/log',
67            'facility': "local6",
68            'formatter': 'verbose',
69            },
70        },
71    'loggers': {
72        'analyticsd-logger': {
73            'handlers': ['sys-logger6','stdout'],
74            'level': loglevel,
75            'propagate': True,
76            },
77        }
78    }
79
80def to_str(x):
81    if isinstance(x,str):
82        return to_utf(x).decode('unicode_escape')
83    else:
84        return x
85
86def to_utf(x):
87    if isinstance(x,str):
88        return x.encode('utf-8')
89    else:
90        return x
91
92def printea(msg="",level='critical'):
93    if level == 'critical':
94        logger.critical(msg)
95    elif level == 'error':
96        logger.error(msg)
97    elif level == 'warning':
98        logger.warning(msg)
99    elif level == 'info':
100        logger.info(msg)
101    else:
102        logger.debug(msg)
103
104def keepalive(who=""):
105    global DEBUG_PRINT_ALIVE_COUNTER
106    t=str(int(time.time()))
107    if DEBUG:
108        pass
109        # too much verbose
110        # printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
111    else:
112        if who == 'main':
113            if DEBUG_PRINT_ALIVE_COUNTER > 0:
114                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
115            else:
116                DEBUG_PRINT_ALIVE_COUNTER=60
117                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
118
119    fp = open('/var/run/analyticsd.keepalive','w');
120    fp.write(t)
121    fp.close()
122
123config.dictConfig(LOGGING)
124logger = logging.getLogger('analyticsd-logger')
125
126DBNAME=None
127USER=None
128PASS=None
129IP=None
130
131EMPTY_PAUSED_SLEEP=1
132CHECK_LOAD_TIME=5
133MAX_RETRIES=5
134TIMED_ON=False
135
136try:
137    with open(FILE,'r') as f:
138        for line in f:
139            if not IP:
140                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
141                if IP:
142                    IP = IP.group(1)
143            if not DBNAME:
144                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
145                if DBNAME:
146                    DBNAME = DBNAME.group(1)
147            if not USER:
148                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
149                if USER:
150                    USER = USER.group(1)
151            if not PASS:
152                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
153                if PASS:
154                    PASS = PASS.group(1)
155    if not (IP or DBNAME or USER or PASS):
156        printea("Couldn't get database configuration from {}".format(FILE))
157    else:
158        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
159except Exception as e:
160    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
161    sys.exit(1)
162
163if not (IP or DBNAME or USER or PASS):
164    printea("Couldn't get database configuration from {}".format(FILE))
165    sys.exit(1)
166
167if THREADED:
168    THREADS=['main','server','client','desktop','other']
169else:
170    THREADS=['main']
171
172class DB():
173    def __init__(self,mon,t='main'):
174        self.t=t
175        self.reconnect=0
176        self.empty = False
177        self.conn=None
178        self.mon=mon
179        self.q=queue.Queue()
180        self.processed=0
181        self.need_clean = False
182        printea('Database worker {} initialized'.format(t),'info')
183
184    def timed(func):
185        @wraps(func)
186        def wrapper(*args,**kwargs):
187            if TIMED_ON:
188                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
189            ret=func(*args,**kwargs)
190            if TIMED_ON:
191                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
192            return ret
193        return wrapper
194
195    def with_retry(func):
196        @wraps(func)
197        def wrapper(*args,**kwargs):
198            if 'retry' not in kwargs:
199                kwargs['retry']=1
200            if 'mon' not in kwargs:
201                kwargs['mon'] = None
202            try:
203                return func(*args,**kwargs) 
204            except Exception as e:
205                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
206                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
207                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
208                    if kwargs['mon']:
209                        kwargs['mon'].term()
210                        sys.exit(1)
211                    return None
212                else:
213                    time.sleep(kwargs['retry']**2)
214                    kwargs['retry']+=1
215                    return wrapper(*args,**kwargs)
216            return result
217        return wrapper
218
219    def with_debug(func):
220        @wraps(func)
221        def wrapper(*args,**kwargs):
222            if 'query' in kwargs:
223                printea("executing query: {}".format(kwargs['query']),'debug')
224            return func(*args,**kwargs)
225        return wrapper
226
227    @with_debug
228    def execute(self,*args,**kwargs):
229        if 'query' not in kwargs:
230            printea("Warning execute called whithout query",'info')
231            return None
232        try:
233            return self.cur.execute(kwargs['query'])
234        except Exception as e:
235            raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
236
237    def init_db(self):
238        try:
239            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
240            self.conn.autocommit(False)
241            self.cur = self.conn.cursor()
242            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
243            variables=self.check_server_variables()
244
245            if variables:
246                self.mon.server_variables=variables
247
248            printea("Connected succesfully {}".format(self.t),'info')
249
250        except mdb.Error as e:
251            printea("Error {}: {}".format(e.args[0],e.args[1]))
252            raise Exception(e)
253
254    def check_server_variables(self):
255        if self.t != 'main':
256            return None
257        else:
258            printea('Getting server variables','info')
259            result = {}
260            self.cur.execute("show global variables")
261            printea('Setting {} vars'.format(self.cur.rowcount),'debug')
262            if self.cur.rowcount > 0:
263                for i in range(self.cur.rowcount):
264                    var_name, var_value = self.cur.fetchone()
265                    result.setdefault(var_name,var_value)
266            return result
267
268    def get_config(self):
269        values={}
270        self.cur.execute('select name,value from Config')
271        for val in self.cur.fetchall():
272            values.setdefault(val[0],val[1])
273        return values
274
275    def put_config(self,values):
276        vals=[]
277        for x in values.keys():
278            vals.append("('{}','{}')".format(x,values[x]))
279        try:
280            self.cur.execute("insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals)))
281            self.conn.commit()
282        except Exception as e:
283            print(e)
284
285    def check_temporary_tables_size(self,table_name):
286        if self.t != 'load':
287            return None
288        else:
289            size = 0
290            rows = 0
291            self.cur.execute('select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
292
293            res=self.cur.fetchone()
294            size = float(res[0])
295            rows = int(res[1])
296            return (int(size),rows)
297
298    def close_db(self):
299        if self.conn:
300            self.conn.close()
301            printea("Closed connection {}".format(self.t),'info')
302
303    def reduce_flavour(self,version,flavour):
304        if version == '15':
305            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
306                return 'desktop'
307            elif 'server' in flavour:
308                return 'server'
309            elif 'client' in flavour:
310                return 'client' 
311        elif version == '16':
312            if 'server' in flavour:
313                return 'server'
314            elif 'client' in flavour:
315                return 'client'
316            elif 'desktop' in flavour:
317                return 'desktop'
318        return 'other'
319
320    def reduce_version(self,version):
321        if version[0:2] in ['15','16']:
322            return version[0:2]
323        else:
324            return 'other'
325
326    def gen_uuid(self,*args,**kwargs):
327        try:
328            string=to_utf(u'-'.join([str(x) for x in args]))
329            h=hashlib.sha1(string)
330            return int(h.hexdigest()[0:16],16)
331        except Exception as e:
332            print(traceback.format_exc())
333
334    @timed
335    @with_retry
336    def get_client(self,*args,**kwargs):
337        try:
338            query="SELECT id,date,user,version,sabor,arch,mem,vga,cpu,ncpu from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window))
339            self.execute(query=query)
340            ret =[]
341            if self.cur.rowcount > 0:
342                for i in range(self.cur.rowcount):
343                    v_id,v_date,v_user,v_version,v_flavour,v_arch,v_mem,v_vga,v_cpu,v_ncpu=self.cur.fetchone()
344                    version=self.reduce_version(v_version)
345                    flavour=self.reduce_flavour(version,v_flavour)
346                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
347                    if not v_arch:
348                        v_arch = 'NULL'
349                    if not v_mem:
350                        v_mem = 'NULL'
351                    if not v_vga:
352                        v_vga = 'NULL'
353                    if not v_cpu:
354                        v_cpu = 'NULL'
355                    if not v_ncpu:
356                        v_ncpu = 'NULL'
357                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour,'arch':v_arch,'mem':v_mem,'vga':v_vga,'cpu':v_cpu,'ncpu':v_ncpu})
358                return ret
359            else:
360                return True
361        except Exception as e:
362            raise Exception("Error getting client: {}".format(e))
363
364    @timed
365    @with_retry
366    def get_apps(self,*args,**kwargs):
367        if 'clients' not in kwargs:
368            printea("Warning executed without named parameter clients",'info')
369            return None
370        ret = {}
371        try:
372            cids_to_rel_fla={}
373            for c in kwargs['clients']:
374                if c['id'] not in cids_to_rel_fla:
375                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
376                if c['flavour'] not in ret:
377                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
378            for c in kwargs['clients']:
379                ret[c['flavour']]['clients'].append(c)
380
381            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
382            self.execute(query=query)
383            if self.cur.rowcount > 0:
384                for i in range(self.cur.rowcount):
385                    row = self.cur.fetchone()
386                    clid = row[0]
387                    rel,fla = cids_to_rel_fla[clid]
388                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
389                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
390            return ret
391        except Exception as e:
392            raise Exception("Error getting apps: {}".format(e))
393
394    @timed
395    @with_retry
396    def put_client(self,*args,**kwargs):
397        if 'client_list' not in kwargs:
398            raise Exception("Called without named parameter client_list")
399        values= []
400        for cli in kwargs['client_list']:
401            values.append("({},'{}','{}','{}','{}','{}','{}','{}',{},'{}','{}',{})".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour'],cli['arch'],cli['mem'],cli['vga'],cli['cpu'],cli['ncpu']))
402        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`,`arch`,`mem`,`vga`,`cpu`,`ncpu`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
403        self.execute(query=query)
404        return True
405
406    @timed
407    @with_retry
408    def put_apps(self,*args,**kwargs):
409        if 'apps' not in kwargs:
410            raise Exception("Called without named parameter apps")
411        app_list = {}
412        for app in kwargs['apps']:
413            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
414            if str(app['uuid']) not in app_list:
415                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
416            else:
417                app_list[str(app['uuid'])]['value']+=app['value']
418        values = []
419        for app in app_list:
420            item=app_list[app]
421            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
422        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
423        self.execute(query=query)
424        return True
425
426    @timed
427    @with_retry
428    def del_client(self,*args,**kwargs):
429        if 'client_list' not in kwargs:
430            raise Exception("Called without named parameter client_list")
431        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
432        self.execute(query=query)
433        return True
434
435    @timed
436    @with_retry
437    def del_apps(self,*args,**kwargs):
438        if 'client_list' not in kwargs:
439            raise Exception("Called without named parameter client_list")
440        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
441        self.execute(query=query)
442        return True
443
444    @timed
445    def reset_autoinc(self):
446        try:
447            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME)
448            self.execute(query=query)
449            ainc = self.cur.fetchone()[0]
450            #query = "SELECT count(*) from tmp_clients"
451            #self.execute(query=query)
452            #cli_size=self.cur.fetchone()[0]
453            #query = "SELECT count(*) from tmp_packages"
454            #self.execute(query=query)
455            #pkg_size=self.cur.fetchone()[0]
456            #printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
457            if ainc > 65500 and self.mon.all_queues_empty():
458            #and cli_size == 0 and pkg_size == 0:
459                query = "TRUNCATE TABLE tmp_clients"
460                self.execute(query=query)
461                query = "TRUNCATE TABLE tmp_packages"
462                self.execute(query=query)
463            return True
464        except Exception as e:
465            raise Exception("Error reseting auto_increment: {}".format(e))
466
467
468    @timed
469    def process_main_thread(self,*args,**kwargs):
470        if self.mon.slow_check_clients > 0 :
471            self.mon.slow_check_clients -= 1
472            time.sleep(1)
473            return True
474        else:
475            self.mon.slow_check_clients = self.mon.default_slow_check_clients
476
477        clis=self.get_client(mon=self.mon)
478        if clis == True: #No clients found (empty)
479            self.empty = True # if > 65500 auto_increment was reset
480            self.mon.schedule(event='NO_MORE_CLIENTS')
481            return False
482        #clients found
483        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
484        # if clients found get apps
485        lapps = self.get_apps(clients=clis,mon=self.mon)
486        #lapps can be empty
487        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
488        #If deletion was failed , thread was died
489        lapps_tmp={'apps':[],'clients':[]}
490        for fla in lapps:
491            if THREADED:
492                lapps[fla]['timestamp']=time.time()
493                self.mon.db[fla].q.put(lapps[fla],True)
494            else:
495                lapps_tmp['apps'].extend(lapps[fla]['apps'])
496                lapps_tmp['clients'].extend(lapps[fla]['clients'])
497                self.mon.db['main'].q.put(lapps_tmp,True)
498        #if DEBUG:
499        self.processed+=len(clis)
500        return True
501
502    @timed
503    def process_all_threads(self,*args,**kwargs):
504        lapps=self.q.get(True)
505        #print "Running {}".format(self.t)
506        if THREADED:
507            while (lapps['timestamp'] > self.mon.commited):
508                time.sleep(0.001)
509        if len(lapps['clients']) != 0:
510            printea('Thread {} putting client'.format(self.t),'debug')
511            #IF FAIL, AFTER RETRIES THREAD DIES
512            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
513                self.q.put(lapps,True) # USELESS
514                return False    # USELESS
515        if len(lapps['apps']) != 0:
516            printea('Thread {} putting clientapps'.format(self.t),'debug')
517            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
518                self.q.put(lapps,True) # USELESS
519                return False    # USELESS
520            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
521                self.q.put(lapps,True) # USELESS
522                return False    # USELESS
523        #if DEBUG:
524        self.processed+=len(lapps['clients'])
525        return True
526
527    def process(self,*args,**kwargs):
528        keepalive(self.t)
529        # warning too much verbose, printea("Running thread {}".format(self.t),'debug')
530        if self.t == 'main' and not self.mon.terminate:
531            ret=self.process_main_thread(*args,**kwargs)
532            if ret == False: #No more clients available
533                return True
534            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
535                if THREADED:
536                    return True #Need return to avoid put empty flag and wait into main thread
537
538        #after this poing, code for all threads
539        if not self.q.empty():
540            ret=self.process_all_threads(*args,**kwargs)
541            if ret == False: # USELESS? THREADS was died
542                printea("Error threads")
543                return ret
544        else: 
545            del self.q
546            self.q = queue.Queue()
547            self.empty = True
548        return True
549
550    def worker(self):
551        printea("Starting worker {} processing".format(self.t),'info')
552        while not (self.mon.terminate and self.empty):
553            if self.mon.paused or self.empty:
554                #if self.empty and self.t == 'main':
555                #    self.reset_autoinc()
556                if self.mon.paused:
557                    printea("Paused by high load {}".format(self.t),'debug')
558                # Too much verbose
559                #if self.empty:
560                #    printea("Empty queue {} sleeping by now".format(self.t),'debug')
561                if self.empty:
562                    self.empty = False
563                time.sleep(EMPTY_PAUSED_SLEEP)
564            else:
565                try:
566                    if self.conn == None:
567                        raise EnvironmentError('Connection not available, probably it\'s a first run')
568                    else:
569                        self.conn.begin()
570                    if self.process():
571                        self.conn.commit()
572                        self.mon.commited=time.time()
573                        self.reconnect = 0
574                        if self.need_clean:
575                            gc.collect()
576                    else:
577                        self.conn.rollback()
578                except EnvironmentError as e:
579                    printea(e)
580                    self.init_db()
581
582                except Exception as e:
583                    try:
584                        if self.conn != None:
585                            self.conn.rollback()
586                    except:
587                        printea("Can't rollback last actions",'info')
588                        pass
589                    #if e[0] != 2006:
590                    #    printea("Exception processing worker({}): {}".format(self.t,e))
591                    #if e[0] == 2006: # SERVER GONE AWAY
592                    #    printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))
593
594                    printea("Trying to recover connection ({}), {}".format(self.t,e))
595                    if self.reconnect == 100:
596                        printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t))
597                        self.mon.term()
598                    else:
599                        self.reconnect+=1
600                        printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
601                        time.sleep(self.reconnect*self.reconnect)
602
603                        try:
604                            self.init_db()
605                            printea("Recovered worker {} connection".format(self.t),'info')
606                        except:
607                            printea('Unable to initialize worker {}'.format(self.t))
608                            pass
609
610
611class Monitor():
612    def __init__(self):
613        self.MAX_QUEUE_UTILIZATION = 100
614        self.USE_MAX_QUEUES = True
615        self.MAX_SELECT_WINDOW = (2 ** 13) +1
616        self.MIN_SELECT_WINDOW = 32
617        self.MEM_USED=0
618        self.MAX_MEM=512
619        self.MIN_FREE_MEM_SERVER=100
620        self.USE_MAX_MEM=True
621        self.lock = Lock()
622        self.terminate = False
623        self.finished = False
624        self.paused = False
625        self.select_window = self.MIN_SELECT_WINDOW
626        self.commited = time.time()
627        self.procesed = 0
628        self.procesed_per_sec = [0]*10
629        self.load = 0
630        self.server_variables = None  # initialized by main worker
631        self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
632        self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
633        self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
634        self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
635        self.default_slow_check_clients = 1
636        self.slow_check_clients = self.default_slow_check_clients
637        self.server_mem = 0
638        self.loadlist = [ 0.0 ] * 100
639        self.max_heap = None
640
641        signal.signal(signal.SIGQUIT,self.term)
642        signal.signal(signal.SIGTERM,self.term)
643        signal.signal(signal.SIGINT,self.term)
644
645        self.db = {}
646        self.threads = {}
647        for x in THREADS:
648            self.db[x] = DB(self,x)
649            #try:
650            #    self.db[x].init_db()
651            #except Exception as e:
652            #    printea('Error initializing database connections: {}'.format(str(e)))
653            #    sys.exit(1)
654        self.cfg= None
655        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
656
657    def windowctl(self, *args, **kwargs):
658        if args[0] == '+':
659            if self.select_window*2 < self.MAX_SELECT_WINDOW:
660                self.select_window*=2
661                self.select_window=int(self.select_window)
662        if args[0] == '-':
663            if self.select_window > self.MIN_SELECT_WINDOW:
664                self.select_window/=2
665                self.select_window=int(self.select_window)
666
667    def slowcheckctl(self, *args, **kwargs):
668        if args[0] == 'reset':
669            self.default_slow_check_clients = 0
670        if args[0] == '+':
671            if self.default_slow_check_clients < MAX_SLOW_CHECK_CLIENTS:
672                if self.default_slow_check_clients == 0:
673                    self.default_slow_check_clients = 1
674                else:
675                    self.default_slow_check_clients = self.default_slow_check_clients * 2
676
677
678    def schedule(self, *args, **kwargs):
679        if kwargs['event'] == 'NO_MORE_CLIENTS':
680            self.windowctl('-')
681            self.slowcheckctl('+')
682
683        if kwargs['event'] == 'HAVE_CLIENTS':
684
685            if kwargs['nclients'] == self.select_window:
686                self.windowctl('+')
687            else:
688                self.windowctl('-')
689            self.slowcheckctl('reset')
690#
691#                if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
692#                    self.windowctl('+')
693#
694#                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
695#                    self.windowctl('-')
696#                    self.slowcheckctl('+')
697#                else:
698#                    self.windowctl('+')
699#
700#            elif kwargs['nclients'] < self.select_window/2:
701#                self.windowctl('-')
702#
703#            self.slowcheckctl('reset')
704
705        if kwargs['event']=='CHECK_SANITY':
706            # CPU SETTINGS
707            if not self.terminate and self.load > LIMIT:
708                self.paused = True
709            else:
710                self.paused = False
711
712            # DB MEM SETTINGS
713            if self.max_heap and self.temporary_tables_size['sum']:
714                if self.temporary_tables_size['sum'] > self.max_heap * 0.4:
715                    self.windowctl('+')
716                    if self.paused:
717                        #printea('Hitting max temporary table size unpausing','critical')
718                        self.paused = False
719            # SERVER MEM
720            if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER:
721                #printea('Hitting max memory from server collecting and reducing window','critical')
722                self.windowctl('-')
723                self.slowcheckctl('+')
724                self.USE_MAX_QUEUES=True
725                for x in THREADS:
726                    self.db[x].need_clean=True
727                gc.collect()
728            else:
729#                self.USE_MAX_QUEUES=False
730                for x in THREADS:
731                    self.db[x].need_clean=False
732
733            if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
734                self.windowctl('-')
735                self.slowcheckctl('+')
736
737            # QUEUES
738            if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
739                self.windowctl('+')
740
741
742    def get_cpu_load(self):
743        self.loadlist.append(psutil.cpu_percent())
744        self.loadlist=self.loadlist[1:]
745        avg=0.0
746        for x in self.loadlist:
747            avg+=x
748        return round(avg/100.0,2)
749
750    def put_config(self,key='',value=''):
751        pass
752
753    def term(self,*args,**kwargs):
754        printea("Begin kill the program, wait please...",'info')
755        self.terminate=True
756
757    def prepare_threads(self):
758        global DAEMON
759
760        self.threads['load']=Process(target=self.get_load)
761        self.threads['load'].daemon = DAEMON
762        self.threads['load'].start()
763        for x in THREADS:
764            self.threads[x]=Process(target=self.db[x].worker)
765            self.threads[x].daemon = DAEMON
766            self.threads[x].start()
767
768    def get_mem_usage(self):
769        process = psutil.Process(os.getpid())
770        mem = process.memory_info()[0] / float(2 ** 20)
771        return mem
772
773    def get_server_free_mem(self):
774        mem = psutil.virtual_memory()
775        return mem.free / (2 ** 20)
776
777    def print_stats(self):
778        global CHECK_LOAD_TIME
779#        if DEBUG:
780        if True:
781            out="Processed: "
782            out2=''
783            total=0
784            for x in THREADS:
785                out2+='{}={} '.format(x,self.db[x].processed)
786                self.cfg.store('processed '+x,self.db[x].processed)
787                if THREADED:
788                    if x != 'main':
789                        total += self.db[x].processed
790                else:
791                    total += self.db[x].processed
792            out += "TOTAL={} {}".format(total,out2)
793            self.cfg.store('processed total',total)
794            proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME)
795            self.procesed_per_sec.append(proc_per_sec)
796            self.procesed_per_sec=self.procesed_per_sec[-10:]
797            f=lambda x,y:x+y
798            suma_proc=reduce(f,self.procesed_per_sec)
799            self.cfg.store('processing at',int(suma_proc/10))
800            self.procesed=total
801            total = 0
802            if THREADED:
803                out+="Queues: "
804                sizes=self.get_q_sizes()
805                out2=''
806                for x in sizes:
807                    self.cfg.store('queue '+x,sizes[x])
808                    out2+='{}={} '.format(x,sizes[x])
809                    total += sizes[x]
810                self.cfg.store('queue totals',total)
811                out+="TOTAL={} {}".format(total,out2)
812            self.cfg.store('select window',self.select_window)
813            self.cfg.store('mem used',self.MEM_USED)
814            self.cfg.store('load',self.load)
815            if (self.paused):
816                self.cfg.store('paused',1)
817            else:
818                self.cfg.store('paused',0)
819            self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients']))
820            self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages']))
821            self.cfg.store('db_clients_size',int(self.db_tables_size['clients']))
822            self.cfg.store('db_packages_size',int(self.db_tables_size['packages']))
823            if DEBUG:
824                if THREADED:
825                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
826                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
827                else:
828                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
829                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
830
831    def get_q_sizes(self):
832        sizes={}
833        for x in THREADS:
834            sizes[x]=self.db[x].q.qsize()
835        return sizes
836
837    def sum_q_sizes(self):
838        sizes=self.get_q_sizes()
839        f=lambda x,y: x+y
840        return reduce(f,[sizes[x] for x in sizes])
841
842    def all_queues_empty(self):
843        sizes=self.get_q_sizes()
844        for x in sizes:
845            if sizes[x] != 0:
846                return False
847        return True
848
849    def get_load(self):
850        # runs on separate thread
851        db = DB(self,'load')
852        db.init_db()
853        self.cfg = Config(db)
854        ctime=0
855        while not (self.terminate and self.finished): #and not self.paused
856            self.cfg.store('keepalive',int(time.time()))
857            time.sleep(1)
858            self.load=self.get_cpu_load()
859            ctime+=1
860            self.schedule(event='CHECK_SANITY')
861            if ctime >CHECK_LOAD_TIME:
862                self.cfg.write()
863                ctime=0.0
864
865                self.MEM_USED=self.get_mem_usage()
866                self.server_mem=self.get_server_free_mem()
867
868                if self.server_variables and 'max_heap_table_size' in self.server_variables:
869                    self.max_heap=int(self.server_variables['max_heap_table_size'])/(2**20)
870
871                self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size('tmp_clients')
872                self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size('tmp_packages')
873                self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size('Client_Versions')
874                self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size('RecvPackages')
875                self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages']
876                self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages']
877                self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages']
878                self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages']
879                qempty = self.all_queues_empty()
880                if self.temporary_tables_rows['clients'] == 0 and self.temporary_tables_rows['packages'] == 0 and qempty:
881                    db.reset_autoinc()
882                self.print_stats()
883                #self.load=os.getloadavg()[0]
884                if qempty:
885                    gc.collect()
886            #end if
887        #end while
888        db.close_db()
889
890    def start(self):
891        self.prepare_threads()
892
893    def end(self):
894        for x in self.db:
895            self.threads[x].join()
896            self.db[x].close_db()
897        self.finished = True
898        self.print_stats()
899
900
901class Config():
902    def __init__(self,connection):
903        self._db = connection
904        self.read()
905
906    def store(self,var,value):
907        var=var.replace(' ','_')
908        if isinstance(value,str):
909            if value.isnumeric():
910                setattr(self,var,int(value))
911            else:
912                setattr(self,var,str(value))
913        else:
914            setattr(self,var,int(value))
915
916    def write(self):
917        if self._db:
918            values={}
919            for x in self.get_internal_vars():
920                values.setdefault(str(x),str(getattr(self,x)))
921            self._db.put_config(values)
922
923    def read(self):
924        if self._db:
925            config=self._db.get_config()
926            for key in config.keys():
927                if config[key].isnumeric():
928                    setattr(self,key,int(config[key]))
929                else:
930                    setattr(self,key,config[key])
931        else:
932            print('No config yet')
933
934    def get_internal_vars(self):
935        return list(filter(lambda x : x[0] != '_',self.__dict__.keys()))
936
937    def print(self):
938        for v in self.get_internal_vars():
939            print('{} = {}'.format(v,getattr(self,v)))
940
941
942def main(*args,**kwargs):
943    gc.enable()
944    if DAEMON:
945        fp = open('/var/run/analyticsd.pid','w');
946        fp.write(str(os.getpid()))
947        fp.close()
948    m = Monitor()
949    m.start()
950    printea("start done",'info')
951    while not m.terminate:
952        time.sleep(0.5)
953    m.end()
954    printea("Exitting...",'info')
955
956if __name__ == "__main__":
957    exit = False
958    keyword='analyticsd'
959    for proc in psutil.process_iter():
960        for argument in proc.cmdline():
961            #print('{} {} {}'.format(proc.cmdline(),keyword,argument[-len(keyword):]))
962            if keyword in argument[-len(keyword):]:
963                exit = True
964                break
965        if exit:
966            break
967    if exit:
968        printea("Another daemon running... exitting now!")
969        sys.exit(1)
970    lck = '/var/run/analyticsd.lock'
971    if DAEMON:
972        if os.path.isfile(lck):
973            printea('Lockfile {} detected, unable to start'.format(lck),'error')
974            sys.exit(1)
975        else:
976            try:
977                with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock(lck),files_preserve=[logger.handlers[0].socket.fileno()]):
978                    main()
979            except Exception as e:
980                printea(e)
981                sys.exit(1)
982    else:
983        main()
Note: See TracBrowser for help on using the repository browser.