source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 7114

Last change on this file since 7114 was 7114, checked in by mabarracus, 2 years ago

wip scheduler improvements

  • Property svn:executable set to *
File size: 41.2 KB
Line 
1#!/usr/bin/env python3
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7#import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21import logging
22import logging.handlers
23from logging import config
24from functools import reduce
25import traceback
26
27##### START EDITABLE VARS #####
28
29DEBUG=False
30MIN_LOG_LEVEL=logging.INFO
31DAEMON=True
32FILE='/usr/lib/analytics-server/analytics/config.php'
33LIMIT=70.0       # PERCENTAGE LOAD LIMIT
34WARNING_MIN_HEAP = 256
35##### END EDITABLE VARS #####
36
37DEBUG_PRINT_ALIVE_COUNTER=60
38THREADED=True
39MAX_SLOW_CHECK_CLIENTS = 30
40USE_FILE_EVENT_LOG=True
41USE_FILE_EVENT_DB=False
42USE_CONFIG_FILE_LOG=True
43CONFIG_FILE_LOG='/var/run/analyticsd_config.log'
44FILE_EVENT_LOG='/var/run/analyticsd_event.log'
45
46if DEBUG:
47    if MIN_LOG_LEVEL:
48        loglevel=MIN_LOG_LEVEL
49    else:
50        loglevel=logging.DEBUG
51else:
52    loglevel=logging.INFO
53
54LOGGING = {
55    'version': 1,
56    'disable_existing_loggers': False,
57    'formatters': {
58        'verbose': {
59            'format': '%(levelname)s %(module)s %(message)s'
60            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
61            },
62        },
63    'handlers': {
64        'stdout': {
65            'class': 'logging.StreamHandler',
66            'stream': sys.stdout,
67            'formatter': 'verbose',
68            },
69        'sys-logger6': {
70            'class': 'logging.handlers.SysLogHandler',
71            'address': '/dev/log',
72            'facility': "local6",
73            'formatter': 'verbose',
74            },
75        },
76    'loggers': {
77        'analyticsd-logger': {
78            'handlers': ['sys-logger6','stdout'],
79            'level': loglevel,
80            'propagate': True,
81            },
82        }
83    }
84
85def to_str(x):
86    if isinstance(x,str):
87        return to_utf(x).decode('unicode_escape')
88    else:
89        return x
90
91def to_utf(x):
92    if isinstance(x,str):
93        return x.encode('utf-8')
94    else:
95        return x
96
97def printea(msg="",level='critical'):
98    if level == 'critical':
99        logger.critical(msg)
100    elif level == 'error':
101        logger.error(msg)
102    elif level == 'warning':
103        logger.warning(msg)
104    elif level == 'info':
105        logger.info(msg)
106    else:
107        logger.debug(msg)
108
109def keepalive(who=""):
110    global DEBUG_PRINT_ALIVE_COUNTER
111    t=str(int(time.time()))
112    if DEBUG:
113        pass
114        # too much verbose
115        # printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
116    else:
117        if who == 'main':
118            if DEBUG_PRINT_ALIVE_COUNTER > 0:
119                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
120            else:
121                DEBUG_PRINT_ALIVE_COUNTER=60
122                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
123
124    fp = open('/var/run/analyticsd.keepalive','w');
125    fp.write(t)
126    fp.close()
127
128config.dictConfig(LOGGING)
129logger = logging.getLogger('analyticsd-logger')
130
131DBNAME=None
132USER=None
133PASS=None
134IP=None
135
136EMPTY_PAUSED_SLEEP=1
137CHECK_LOAD_TIME=5
138MAX_RETRIES=5
139TIMED_ON=False
140
141try:
142    with open(FILE,'r') as f:
143        for line in f:
144            if not IP:
145                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
146                if IP:
147                    IP = IP.group(1)
148            if not DBNAME:
149                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
150                if DBNAME:
151                    DBNAME = DBNAME.group(1)
152            if not USER:
153                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
154                if USER:
155                    USER = USER.group(1)
156            if not PASS:
157                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
158                if PASS:
159                    PASS = PASS.group(1)
160    if not (IP or DBNAME or USER or PASS):
161        printea("Couldn't get database configuration from {}".format(FILE))
162    else:
163        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
164except Exception as e:
165    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
166    sys.exit(1)
167
168if not (IP or DBNAME or USER or PASS):
169    printea("Couldn't get database configuration from {}".format(FILE))
170    sys.exit(1)
171
172if THREADED:
173    THREADS=['main','server','client','desktop','other']
174else:
175    THREADS=['main']
176
177class DB():
178    def __init__(self,mon,t='main'):
179        self._initialized=False
180        self.t=t
181        self.reconnect=0
182        self.empty = False
183        self.conn=None
184        self.mon=mon
185        self.q=queue.Queue()
186        self.processed=0
187        self.need_clean = False
188        printea('Database worker {} initialized'.format(t),'info')
189
190    def timed(func):
191        @wraps(func)
192        def wrapper(*args,**kwargs):
193            if TIMED_ON:
194                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
195            ret=func(*args,**kwargs)
196            if TIMED_ON:
197                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
198            return ret
199        return wrapper
200
201    def with_retry(func):
202        @wraps(func)
203        def wrapper(*args,**kwargs):
204            if 'retry' not in kwargs:
205                kwargs['retry']=1
206            if 'mon' not in kwargs:
207                kwargs['mon'] = None
208            try:
209                return func(*args,**kwargs) 
210            except Exception as e:
211                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
212                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
213                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
214                    if kwargs['mon']:
215                        kwargs['mon'].term()
216                        sys.exit(1)
217                    return None
218                else:
219                    time.sleep(kwargs['retry']**2)
220                    kwargs['retry']+=1
221                    return wrapper(*args,**kwargs)
222            return result
223        return wrapper
224
225    def with_debug(func):
226        @wraps(func)
227        def wrapper(*args,**kwargs):
228            if 'query' in kwargs:
229                printea("executing query: {}".format(kwargs['query']),'debug')
230            return func(*args,**kwargs)
231        return wrapper
232
233    @with_debug
234    def execute(self,*args,**kwargs):
235        if 'query' not in kwargs:
236            printea("Warning execute called whithout query",'info')
237            return None
238        try:
239            return self.cur.execute(kwargs['query'])
240        except mdb.OperationalError as e:
241                printea('({}) Operational error on mysql, error is: {}'.format(self.t,e),'error')
242                self.init_db()
243        except Exception as e:
244                raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
245
246    def init_db(self):
247        try:
248            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
249            self.conn.autocommit(False)
250            self.cur = self.conn.cursor()
251            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
252            variables=self.check_server_variables()
253
254            if variables:
255                self.mon.server_variables=variables
256
257            printea("Connected succesfully {} thread_id={}".format(self.t,self.conn.thread_id()),'info')
258
259        except mdb.Error as e:
260            printea("Error {}: {}".format(e.args[0],e.args[1]))
261            raise Exception(e)
262
263    def check_server_variables(self):
264        if self.t != 'main':
265            return None
266        else:
267            printea('Getting server variables','info')
268            result = {}
269            self.cur.execute("show global variables")
270            printea('Setting {} vars'.format(self.cur.rowcount),'debug')
271            if self.cur.rowcount > 0:
272                for i in range(self.cur.rowcount):
273                    var_name, var_value = self.cur.fetchone()
274                    result.setdefault(var_name,var_value)
275            return result
276
277    def get_config(self):
278        values={}
279        self.cur.execute('select name,value from Config')
280        for val in self.cur.fetchall():
281            values.setdefault(val[0],val[1])
282        return values
283
284    @with_retry
285    def put_config(self,*args,**kwargs):
286        values = kwargs.get('values')
287        vals=[]
288        for x in values.keys():
289            vals.append("('{}','{}')".format(x,values[x]))
290        try:
291            self.execute(query="insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals)))
292            self.conn.commit()
293        except Exception as e:
294            printea(e,'error')
295
296    @with_retry
297    def check_temporary_tables_size(self,*args,**kwargs):
298        table_name=kwargs.get('table_name')
299        if self.t != 'load':
300            return None
301        else:
302            size = 0
303            rows = 0
304            self.execute(query='select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
305
306            res=self.cur.fetchone()
307            size = float(res[0])
308            rows = int(res[1])
309            return (int(size),rows)
310
311    def close_db(self):
312        if self.conn:
313            self.conn.close()
314            printea("Closed connection {}".format(self.t),'info')
315
316    def reduce_flavour(self,version,flavour):
317        if version == '15':
318            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
319                return 'desktop'
320            elif 'server' in flavour:
321                return 'server'
322            elif 'client' in flavour:
323                return 'client' 
324        elif version == '16':
325            if 'server' in flavour:
326                return 'server'
327            elif 'client' in flavour:
328                return 'client'
329            elif 'desktop' in flavour:
330                return 'desktop'
331        return 'other'
332
333    def reduce_version(self,version):
334        if version[0:2] in ['15','16']:
335            return version[0:2]
336        else:
337            return 'other'
338
339    def gen_uuid(self,*args,**kwargs):
340        try:
341            string=to_utf(u'-'.join([str(x) for x in args]))
342            h=hashlib.sha1(string)
343            return int(h.hexdigest()[0:16],16)
344        except Exception as e:
345            printea(traceback.format_exc(),'critical')
346
347    @timed
348    @with_retry
349    def get_client(self,*args,**kwargs):
350        try:
351            query="SELECT id,date,user,version,sabor,arch,mem,vga,cpu,ncpu,ltsp,mode from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window))
352            self.execute(query=query)
353            ret =[]
354            if self.cur.rowcount > 0:
355                for i in range(self.cur.rowcount):
356                    v_id,v_date,v_user,v_version,v_flavour,v_arch,v_mem,v_vga,v_cpu,v_ncpu,v_ltsp,v_mode=self.cur.fetchone()
357                    version=self.reduce_version(v_version)
358                    flavour=self.reduce_flavour(version,v_flavour)
359                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
360                    if not v_arch:
361                        v_arch = 'NULL'
362                    if not v_mem:
363                        v_mem = 'NULL'
364                    if not v_vga:
365                        v_vga = 'NULL'
366                    if not v_cpu:
367                        v_cpu = 'NULL'
368                    if not v_ncpu:
369                        v_ncpu = 'NULL'
370                    if v_ltsp == '1' or v_ltsp == True:
371                        v_ltsp = 'TRUE'
372                    elif v_ltsp == '0' or v_ltsp == False:
373                        v_ltsp = 'FALSE'
374                    else:
375                        v_ltsp = 'NULL'
376                    if not v_mode:
377                        v_mode='NULL'
378
379                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour,'arch':v_arch,'mem':v_mem,'vga':v_vga,'cpu':v_cpu,'ncpu':v_ncpu,'ltsp':v_ltsp,'mode':v_mode})
380                return ret
381            else:
382                return True
383        except Exception as e:
384            raise Exception("Error getting client: {}".format(e))
385
386    @timed
387    @with_retry
388    def get_apps(self,*args,**kwargs):
389        if 'clients' not in kwargs:
390            printea("Warning executed without named parameter clients",'info')
391            return None
392        ret = {}
393        try:
394            cids_to_rel_fla={}
395            for c in kwargs['clients']:
396                if c['id'] not in cids_to_rel_fla:
397                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
398                if c['flavour'] not in ret:
399                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
400            for c in kwargs['clients']:
401                ret[c['flavour']]['clients'].append(c)
402
403            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
404            self.execute(query=query)
405            if self.cur.rowcount > 0:
406                for i in range(self.cur.rowcount):
407                    row = self.cur.fetchone()
408                    clid = row[0]
409                    rel,fla = cids_to_rel_fla[clid]
410                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
411                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
412            return ret
413        except Exception as e:
414            raise Exception("Error getting apps: {}".format(e))
415
416    @timed
417    @with_retry
418    def put_client(self,*args,**kwargs):
419        if 'client_list' not in kwargs:
420            raise Exception("Called without named parameter client_list")
421        values= []
422        for cli in kwargs['client_list']:
423            values.append("({},'{}','{}','{}','{}','{}','{}','{}',{},'{}','{}',{},{},'{}')".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour'],cli['arch'],cli['mem'],cli['vga'],cli['cpu'],cli['ncpu'],cli['ltsp'],cli['mode']))
424        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`,`arch`,`mem`,`vga`,`cpu`,`ncpu`,`ltsp`,`mode`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
425        self.execute(query=query)
426        return True
427
428    @timed
429    @with_retry
430    def put_apps(self,*args,**kwargs):
431        if 'apps' not in kwargs:
432            raise Exception("Called without named parameter apps")
433        app_list = {}
434        for app in kwargs['apps']:
435            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
436            if str(app['uuid']) not in app_list:
437                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
438            else:
439                app_list[str(app['uuid'])]['value']+=app['value']
440        values = []
441        for app in app_list:
442            item=app_list[app]
443            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
444        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
445        self.execute(query=query)
446        return True
447
448    @timed
449    @with_retry
450    def del_client(self,*args,**kwargs):
451        if 'client_list' not in kwargs:
452            raise Exception("Called without named parameter client_list")
453        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
454        self.execute(query=query)
455        return True
456
457    @timed
458    @with_retry
459    def del_apps(self,*args,**kwargs):
460        if 'client_list' not in kwargs:
461            raise Exception("Called without named parameter client_list")
462        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
463        self.execute(query=query)
464        return True
465
466    @timed
467    def reset_autoinc(self):
468        try:
469            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME)
470            self.execute(query=query)
471            ainc = self.cur.fetchone()[0]
472            #query = "SELECT count(*) from tmp_clients"
473            #self.execute(query=query)
474            #cli_size=self.cur.fetchone()[0]
475            #query = "SELECT count(*) from tmp_packages"
476            #self.execute(query=query)
477            #pkg_size=self.cur.fetchone()[0]
478            #printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
479            if ainc > 65500 and self.mon.all_queues_empty():
480            #and cli_size == 0 and pkg_size == 0:
481                query = "TRUNCATE TABLE tmp_clients"
482                self.execute(query=query)
483                query = "TRUNCATE TABLE tmp_packages"
484                self.execute(query=query)
485            return True
486        except Exception as e:
487            raise Exception("Error reseting auto_increment: {}".format(e))
488
489
490    @timed
491    def process_main_thread(self,*args,**kwargs):
492        if self.mon.slow_check_clients > 0 :
493            self.mon.slow_check_clients -= 1
494            time.sleep(1)
495            return True
496        else:
497            self.mon.slow_check_clients = self.mon.default_slow_check_clients
498
499        clis=self.get_client(mon=self.mon)
500        if clis == True: #No clients found (empty)
501            self.empty = True # if > 65500 auto_increment was reset
502            self.mon.schedule(event='NO_MORE_CLIENTS')
503            return False
504        #clients found
505        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
506        # if clients found get apps
507        lapps = self.get_apps(clients=clis,mon=self.mon)
508        #lapps can be empty
509        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
510        #If deletion was failed , thread was died
511        lapps_tmp={'apps':[],'clients':[]}
512        for fla in lapps:
513            if THREADED:
514                lapps[fla]['timestamp']=time.time()
515                self.mon.db[fla].q.put(lapps[fla],True)
516            else:
517                lapps_tmp['apps'].extend(lapps[fla]['apps'])
518                lapps_tmp['clients'].extend(lapps[fla]['clients'])
519                self.mon.db['main'].q.put(lapps_tmp,True)
520        #if DEBUG:
521        self.processed+=len(clis)
522        return True
523
524    @timed
525    def process_all_threads(self,*args,**kwargs):
526        lapps=self.q.get(True)
527        #print "Running {}".format(self.t)
528        if THREADED:
529            while (lapps['timestamp'] > self.mon.commited):
530                time.sleep(0.001)
531        if len(lapps['clients']) != 0:
532            printea('Thread {} putting client'.format(self.t),'debug')
533            #IF FAIL, AFTER RETRIES THREAD DIES
534            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
535                self.q.put(lapps,True) # USELESS
536                return False    # USELESS
537        if len(lapps['apps']) != 0:
538            printea('Thread {} putting clientapps'.format(self.t),'debug')
539            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
540                self.q.put(lapps,True) # USELESS
541                return False    # USELESS
542            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
543                self.q.put(lapps,True) # USELESS
544                return False    # USELESS
545        #if DEBUG:
546        self.processed+=len(lapps['clients'])
547        return True
548
549    def process(self,*args,**kwargs):
550        keepalive(self.t)
551        # warning too much verbose, printea("Running thread {}".format(self.t),'debug')
552        if self.t == 'main' and not self.mon.terminate:
553            ret=self.process_main_thread(*args,**kwargs)
554            if ret == False: #No more clients available
555                return True
556            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
557                if THREADED:
558                    return True #Need return to avoid put empty flag and wait into main thread
559
560        #after this poing, code for all threads
561        if not self.q.empty():
562            ret=self.process_all_threads(*args,**kwargs)
563            if ret == False: # USELESS? THREADS was died
564                printea("Error threads")
565                return ret
566        else: 
567            del self.q
568            self.q = queue.Queue()
569            self.empty = True
570        return True
571
572    def worker(self):
573        printea("Starting worker {} processing".format(self.t),'info')
574        while not (self.mon.terminate and self.empty):
575            if self.mon.paused or self.empty:
576                #if self.empty and self.t == 'main':
577                #    self.reset_autoinc()
578                if self.mon.paused:
579                    printea("Paused by high load {}".format(self.t),'debug')
580                # Too much verbose
581                #if self.empty:
582                #    printea("Empty queue {} sleeping by now".format(self.t),'debug')
583                if self.empty:
584                    self.empty = False
585                time.sleep(EMPTY_PAUSED_SLEEP)
586            else:
587                try:
588                    if self.conn == None:
589                        if not self._initialized:
590                            self._initialized = True
591                            raise EnvironmentError('Initializing connector {}'.format(self.t))
592                        else:
593                            raise Warning('Connection not available')
594                    else:
595                        self.conn.begin()
596                    if self.process():
597                        self.conn.commit()
598                        self.mon.commited=time.time()
599                        self.reconnect = 0
600                        if self.need_clean:
601                            gc.collect()
602                    else:
603                        self.conn.rollback()
604                except EnvironmentError as e:
605                    printea(e,'info')
606                    self.init_db()
607                except Warning as e:
608                    printea(e,'warning')
609                    self.init_db()
610                except Exception as e:
611                    try:
612                        if self.conn != None:
613                            self.conn.rollback()
614                    except:
615                        printea("Can't rollback last actions",'info')
616                        pass
617                    #if e[0] != 2006:
618                    #    printea("Exception processing worker({}): {}".format(self.t,e))
619                    #if e[0] == 2006: # SERVER GONE AWAY
620                    #    printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))
621
622                    printea("Trying to recover connection ({}), {}".format(self.t,e))
623                    if self.reconnect == 100:
624                        printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t))
625                        self.mon.term()
626                    else:
627                        self.reconnect+=1
628                        printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
629                        time.sleep(self.reconnect*self.reconnect)
630
631                        try:
632                            self.init_db()
633                            printea("Recovered worker {} connection".format(self.t),'info')
634                        except:
635                            printea('Unable to initialize worker {}'.format(self.t))
636                            pass
637
638
639class Monitor():
640    def __init__(self):
641        self.MAX_QUEUE_UTILIZATION = 100
642        self.USE_MAX_QUEUES = True
643        self.MAX_SELECT_WINDOW = (2 ** 13) +1
644        self.MIN_SELECT_WINDOW = 32
645        self.MEM_USED=0
646        self.MAX_MEM=512
647        self.MIN_FREE_MEM_SERVER=100
648        self.USE_MAX_MEM=True
649        self.lock = Lock()
650        self.terminate = False
651        self.finished = False
652        self.paused = False
653        self.select_window = self.MIN_SELECT_WINDOW
654        self.commited = time.time()
655        self.procesed = 0
656        self.procesed_per_sec = [0]*10
657        self.load = 0
658        self.server_variables = None  # initialized by main worker
659        self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
660        self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
661        self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
662        self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
663        self.default_slow_check_clients = 1
664        self.slow_check_clients = self.default_slow_check_clients
665        self.server_mem = 0
666        self.loadlist = [ 0.0 ] * 100
667        self.max_heap = None
668        self.heap_alert_count = 0
669        self.last_events = []
670
671        signal.signal(signal.SIGQUIT,self.term)
672        signal.signal(signal.SIGTERM,self.term)
673        signal.signal(signal.SIGINT,self.term)
674
675        self.db = {}
676        self.threads = {}
677        for x in THREADS:
678            self.db[x] = DB(self,x)
679            #try:
680            #    self.db[x].init_db()
681            #except Exception as e:
682            #    printea('Error initializing database connections: {}'.format(str(e)))
683            #    sys.exit(1)
684        self.cfg= None
685        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
686
687    def windowctl(self, *args, **kwargs):
688        if args[0] == '+':
689            if self.select_window*2 < self.MAX_SELECT_WINDOW:
690                self.select_window*=2
691                self.select_window=int(self.select_window)
692        if args[0] == '-':
693            if self.select_window > self.MIN_SELECT_WINDOW:
694                self.select_window/=2
695                self.select_window=int(self.select_window)
696
697    def slowcheckctl(self, *args, **kwargs):
698        if args[0] == 'reset':
699            self.default_slow_check_clients = 0
700        if args[0] == '+':
701            if self.default_slow_check_clients < MAX_SLOW_CHECK_CLIENTS:
702                if self.default_slow_check_clients == 0:
703                    self.default_slow_check_clients = 1
704                else:
705                    self.default_slow_check_clients = self.default_slow_check_clients * 2
706        if args[0] == '-':
707            if self.default_slow_check_clients > 0:
708                self.default_slow_check_clients = self.default_slow_check_clients / 2
709
710    def append_event_log(self,*args,**kwargs):
711        if DEBUG:
712            if USE_FILE_EVENT_LOG:
713                try:
714                    fp = open(FILE_EVENT_LOG,'a')
715                except:
716                    fp = False
717            else:
718                fp = False
719            separator=':'
720            for x in args:
721                the_time=str(int(time.time()))
722                the_value=str(x)
723                the_string=the_time+separator+the_value
724                self.last_events.append(the_string)
725                if fp:
726                    fp.write('{}\n'.format(the_string))
727            if fp:
728                fp.close()
729
730    def schedule(self, *args, **kwargs):
731        if kwargs['event'] == 'NO_MORE_CLIENTS':
732            self.append_event_log(kwargs['event'])
733            self.windowctl('-')
734            self.slowcheckctl('+')
735
736        if kwargs['event'] == 'HAVE_CLIENTS':
737            self.append_event_log(kwargs['event'])
738            if kwargs['nclients'] == self.select_window:
739                self.windowctl('+')
740            else:
741                self.windowctl('-')
742            self.slowcheckctl('reset')
743#
744#                if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
745#                    self.windowctl('+')
746#
747#                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
748#                    self.windowctl('-')
749#                    self.slowcheckctl('+')
750#                else:
751#                    self.windowctl('+')
752#
753#            elif kwargs['nclients'] < self.select_window/2:
754#                self.windowctl('-')
755#
756#            self.slowcheckctl('reset')
757
758        if kwargs['event']=='CHECK_SANITY':
759            # CPU SETTINGS
760            if not self.terminate and self.load > LIMIT:
761                self.append_event_log('LOAD_LIMIT_REACHED')
762                self.paused = True
763            else:
764                self.paused = False
765
766            # DB MEM SETTINGS
767            if self.max_heap and self.temporary_tables_size['sum']:
768                if self.temporary_tables_size['sum'] > self.max_heap * 0.3:
769                    self.heap_alert_count += 1
770                    self.append_event_log('MAX_HEAP_ALERT')
771                    self.windowctl('+')
772                    self.slowcheckctl('-')
773                    if self.heap_alert_count > 50:
774                        self.append_event_log('EXTREME_HEAP_RULES')
775                        self.slowcheckctl('reset')
776                    if self.paused:
777                        #printea('Hitting max temporary table size unpausing','critical')
778                        self.paused = False
779                else:
780                    self.heap_alert_count=0
781            # SERVER MEM
782            if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER:
783                self.append_event_log('SERVER_MEM_ALERT')
784                #printea('Hitting max memory from server collecting and reducing window','critical')
785                self.windowctl('-')
786                self.slowcheckctl('+')
787                self.USE_MAX_QUEUES=True
788                for x in THREADS:
789                    self.db[x].need_clean=True
790                gc.collect()
791            else:
792#                self.USE_MAX_QUEUES=False
793                for x in THREADS:
794                    self.db[x].need_clean=False
795
796            if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
797                self.append_event_log('MAX_MEM_ALERT')
798                self.windowctl('-')
799                self.slowcheckctl('+')
800
801            # QUEUES
802            if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
803                self.append_event_log('MAX_QUEUES_REACHED')
804                self.windowctl('+')
805
806
807    def get_cpu_load(self):
808        self.loadlist.append(psutil.cpu_percent())
809        self.loadlist=self.loadlist[1:]
810        avg=0.0
811        for x in self.loadlist:
812            avg+=x
813        return round(avg/100.0,2)
814
815    def term(self,*args,**kwargs):
816        printea("Begin kill the program, wait please...",'info')
817        self.terminate=True
818
819    def prepare_threads(self):
820        global DAEMON
821
822        self.threads['load']=Process(target=self.get_load)
823        self.threads['load'].daemon = DAEMON
824        self.threads['load'].start()
825        for x in THREADS:
826            self.threads[x]=Process(target=self.db[x].worker)
827            self.threads[x].daemon = DAEMON
828            self.threads[x].start()
829
830    def get_mem_usage(self):
831        process = psutil.Process(os.getpid())
832        mem = process.memory_info()[0] / float(2 ** 20)
833        return mem
834
835    def get_server_free_mem(self):
836        mem = psutil.virtual_memory()
837        return mem.free / (2 ** 20)
838
839    def print_stats(self):
840        global CHECK_LOAD_TIME
841#        if DEBUG:
842        if True:
843            out="Processed: "
844            out2=''
845            total=0
846            for x in THREADS:
847                out2+='{}={} '.format(x,self.db[x].processed)
848                self.cfg.store('processed '+x,self.db[x].processed)
849                if THREADED:
850                    if x != 'main':
851                        total += self.db[x].processed
852                else:
853                    total += self.db[x].processed
854            out += "TOTAL={} {}".format(total,out2)
855            self.cfg.store('processed total',total)
856            proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME)
857            self.procesed_per_sec.append(proc_per_sec)
858            self.procesed_per_sec=self.procesed_per_sec[-10:]
859            f=lambda x,y:x+y
860            suma_proc=reduce(f,self.procesed_per_sec)
861            self.cfg.store('processing at',int(suma_proc/10))
862            self.procesed=total
863            total = 0
864            if THREADED:
865                out+="Queues: "
866                sizes=self.get_q_sizes()
867                out2=''
868                for x in sizes:
869                    self.cfg.store('queue '+x,sizes[x])
870                    out2+='{}={} '.format(x,sizes[x])
871                    total += sizes[x]
872                self.cfg.store('queue totals',total)
873                out+="TOTAL={} {}".format(total,out2)
874            self.cfg.store('select window',self.select_window)
875            self.cfg.store('mem used',self.MEM_USED)
876            self.cfg.store('load',self.load)
877            if (self.paused):
878                self.cfg.store('paused',1)
879            else:
880                self.cfg.store('paused',0)
881            self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients']))
882            self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages']))
883            self.cfg.store('temp_clients_rows',int(self.temporary_tables_rows['clients']))
884            self.cfg.store('temp_packages_rows',int(self.temporary_tables_rows['packages']))
885            self.cfg.store('db_clients_size',int(self.db_tables_size['clients']))
886            self.cfg.store('db_packages_size',int(self.db_tables_size['packages']))
887            self.cfg.store('default_slow_check_clients',int(self.default_slow_check_clients))
888            self.cfg.store('slow_check_clients',int(self.slow_check_clients))
889            if DEBUG:
890                if USE_FILE_EVENT_DB:
891                    max_event=len(self.last_events)
892                    if max_event > 20:
893                        max_event = 20
894                    for i in range(0,max_event):
895                        self.cfg.store('event{:02d}'.format(i),self.last_events[i])
896                self.last_events=self.last_events[-20:]
897                if THREADED:
898                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
899                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
900                else:
901                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
902                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
903
904    def get_q_sizes(self):
905        sizes={}
906        for x in THREADS:
907            sizes[x]=self.db[x].q.qsize()
908        return sizes
909
910    def sum_q_sizes(self):
911        sizes=self.get_q_sizes()
912        f=lambda x,y: x+y
913        return reduce(f,[sizes[x] for x in sizes])
914
915    def all_queues_empty(self):
916        sizes=self.get_q_sizes()
917        for x in sizes:
918            if sizes[x] != 0:
919                return False
920        return True
921
922    def get_load(self):
923        # runs on separate thread
924        db = DB(self,'load')
925        db.init_db()
926        self.cfg = Config(db)
927        ctime=0
928        while not (self.terminate and self.finished): #and not self.paused
929            self.cfg.store('keepalive',int(time.time()))
930            time.sleep(1)
931            self.load=self.get_cpu_load()
932            ctime+=1
933            self.schedule(event='CHECK_SANITY')
934            if ctime >CHECK_LOAD_TIME:
935                self.cfg.write()
936                ctime=0.0
937
938                self.MEM_USED=self.get_mem_usage()
939                self.server_mem=self.get_server_free_mem()
940
941                if self.server_variables and 'max_heap_table_size' in self.server_variables:
942                    self.max_heap=int(self.server_variables['max_heap_table_size'])/(2**20)
943                    if self.max_heap < WARNING_MIN_HEAP:
944                        printea("******************** MIN HEAP IS TOO SMALL -> {} < {} ********************".format(self.max_heap,WARNING_MIN_HEAP),'warning')
945
946                self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='tmp_clients')
947                self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='tmp_packages')
948                self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='Client_Versions')
949                self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='RecvPackages')
950                self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages']
951                self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages']
952                self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages']
953                self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages']
954                qempty = self.all_queues_empty()
955                if self.temporary_tables_rows['clients'] == 0 and self.temporary_tables_rows['packages'] == 0 and qempty:
956                    db.reset_autoinc()
957                self.print_stats()
958                #self.load=os.getloadavg()[0]
959                if qempty:
960                    gc.collect()
961            #end if
962        #end while
963        db.close_db()
964
965    def start(self):
966        self.prepare_threads()
967
968    def end(self):
969        for x in self.db:
970            self.threads[x].join()
971            self.db[x].close_db()
972        self.finished = True
973        self.print_stats()
974
975
976class Config():
977    def __init__(self,connection):
978        self._db = connection
979        self.read()
980
981    def store(self,var,value):
982        var=var.replace(' ','_')
983        if isinstance(value,str):
984            if value.isnumeric():
985                setattr(self,var,int(value))
986            else:
987                setattr(self,var,str(value))
988        else:
989            setattr(self,var,int(value))
990
991    def write(self):
992        values={}
993        for x in self.get_internal_vars():
994            values.setdefault(str(x)[:20],str(getattr(self,x))[:45])
995        if self._db:
996            self._db.put_config(values=values)
997        the_string="CONFIGURATION\n"
998        if USE_CONFIG_FILE_LOG:
999            fnbuild = lambda kv : str(kv[0])+'='+str(kv[1])+"\n"
1000            the_string+=reduce((lambda x,y: x+y) ,map(fnbuild,values.items()))
1001            try:
1002                with open(CONFIG_FILE_LOG,'w') as fp:
1003                    fp.write(the_string)
1004            except:
1005                pass
1006
1007    def read(self):
1008        if self._db:
1009            config=self._db.get_config()
1010            for key in config.keys():
1011                if config[key].isnumeric():
1012                    setattr(self,key,int(config[key]))
1013                else:
1014                    setattr(self,key,config[key])
1015        else:
1016            printea('No config yet')
1017
1018    def get_internal_vars(self):
1019        return list(filter(lambda x : x[0] != '_',self.__dict__.keys()))
1020
1021    def print(self):
1022        for v in self.get_internal_vars():
1023            print('{} = {}'.format(v,getattr(self,v)))
1024
1025
1026def main(*args,**kwargs):
1027    gc.enable()
1028    if DAEMON:
1029        fp = open('/var/run/analyticsd.pid','w');
1030        fp.write(str(os.getpid()))
1031        fp.close()
1032    m = Monitor()
1033    m.start()
1034    printea("start done",'info')
1035    while not m.terminate:
1036        time.sleep(0.5)
1037    m.end()
1038    printea("Exitting...",'info')
1039
1040if __name__ == "__main__":
1041    exit = 0
1042    keyword='analyticsd'
1043    interpreter='python3'
1044    for proc in psutil.process_iter():
1045        a=False
1046        b=False
1047        for argument in proc.cmdline():
1048            #print('{} {} {}'.format(proc.cmdline,keyword,argument[-len(keyword):]))
1049            if interpreter in argument[-len(interpreter):]:
1050                a = True
1051            if keyword in argument[-len(keyword):]:
1052                b = True
1053            if a and b:
1054                exit = exit +1
1055    if exit > 1:
1056        printea('Another daemon is running','error')
1057        sys.exit(1)
1058
1059    lck = '/var/run/analyticsd'
1060    if DAEMON:
1061        if os.path.isfile(lck+'.lock'):
1062            printea('Lockfile {} detected, unable to start'.format(lck),'error')
1063            sys.exit(1)
1064        else:
1065            try:
1066                with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock(lck),files_preserve=[logger.handlers[0].socket.fileno()]):
1067                    main()
1068            except Exception as e:
1069                printea(e)
1070                sys.exit(1)
1071    else:
1072        main()
Note: See TracBrowser for help on using the repository browser.