source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 7112

Last change on this file since 7112 was 7112, checked in by mabarracus, 2 years ago

wip scheduler improvements

  • Property svn:executable set to *
File size: 40.9 KB
Line 
1#!/usr/bin/env python3
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7#import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21import logging
22import logging.handlers
23from logging import config
24from functools import reduce
25import traceback
26
27##### START EDITABLE VARS #####
28
29DEBUG=False
30MIN_LOG_LEVEL=logging.INFO
31DAEMON=True
32FILE='/usr/lib/analytics-server/analytics/config.php'
33LIMIT=70.0       # PERCENTAGE LOAD LIMIT
34WARNING_MIN_HEAP = 256
35##### END EDITABLE VARS #####
36
37DEBUG_PRINT_ALIVE_COUNTER=60
38THREADED=True
39MAX_SLOW_CHECK_CLIENTS = 30
40USE_FILE_EVENT_LOG=True
41USE_FILE_EVENT_DB=False
42USE_CONFIG_FILE_LOG=True
43CONFIG_FILE_LOG='/var/run/analyticsd_config.log'
44FILE_EVENT_LOG='/var/run/analyticsd_event.log'
45
46if DEBUG:
47    if MIN_LOG_LEVEL:
48        loglevel=MIN_LOG_LEVEL
49    else:
50        loglevel=logging.DEBUG
51else:
52    loglevel=logging.INFO
53
54LOGGING = {
55    'version': 1,
56    'disable_existing_loggers': False,
57    'formatters': {
58        'verbose': {
59            'format': '%(levelname)s %(module)s %(message)s'
60            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
61            },
62        },
63    'handlers': {
64        'stdout': {
65            'class': 'logging.StreamHandler',
66            'stream': sys.stdout,
67            'formatter': 'verbose',
68            },
69        'sys-logger6': {
70            'class': 'logging.handlers.SysLogHandler',
71            'address': '/dev/log',
72            'facility': "local6",
73            'formatter': 'verbose',
74            },
75        },
76    'loggers': {
77        'analyticsd-logger': {
78            'handlers': ['sys-logger6','stdout'],
79            'level': loglevel,
80            'propagate': True,
81            },
82        }
83    }
84
85def to_str(x):
86    if isinstance(x,str):
87        return to_utf(x).decode('unicode_escape')
88    else:
89        return x
90
91def to_utf(x):
92    if isinstance(x,str):
93        return x.encode('utf-8')
94    else:
95        return x
96
97def printea(msg="",level='critical'):
98    if level == 'critical':
99        logger.critical(msg)
100    elif level == 'error':
101        logger.error(msg)
102    elif level == 'warning':
103        logger.warning(msg)
104    elif level == 'info':
105        logger.info(msg)
106    else:
107        logger.debug(msg)
108
109def keepalive(who=""):
110    global DEBUG_PRINT_ALIVE_COUNTER
111    t=str(int(time.time()))
112    if DEBUG:
113        pass
114        # too much verbose
115        # printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
116    else:
117        if who == 'main':
118            if DEBUG_PRINT_ALIVE_COUNTER > 0:
119                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
120            else:
121                DEBUG_PRINT_ALIVE_COUNTER=60
122                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
123
124    fp = open('/var/run/analyticsd.keepalive','w');
125    fp.write(t)
126    fp.close()
127
128config.dictConfig(LOGGING)
129logger = logging.getLogger('analyticsd-logger')
130
131DBNAME=None
132USER=None
133PASS=None
134IP=None
135
136EMPTY_PAUSED_SLEEP=1
137CHECK_LOAD_TIME=5
138MAX_RETRIES=5
139TIMED_ON=False
140
141try:
142    with open(FILE,'r') as f:
143        for line in f:
144            if not IP:
145                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
146                if IP:
147                    IP = IP.group(1)
148            if not DBNAME:
149                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
150                if DBNAME:
151                    DBNAME = DBNAME.group(1)
152            if not USER:
153                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
154                if USER:
155                    USER = USER.group(1)
156            if not PASS:
157                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
158                if PASS:
159                    PASS = PASS.group(1)
160    if not (IP or DBNAME or USER or PASS):
161        printea("Couldn't get database configuration from {}".format(FILE))
162    else:
163        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
164except Exception as e:
165    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
166    sys.exit(1)
167
168if not (IP or DBNAME or USER or PASS):
169    printea("Couldn't get database configuration from {}".format(FILE))
170    sys.exit(1)
171
172if THREADED:
173    THREADS=['main','server','client','desktop','other']
174else:
175    THREADS=['main']
176
177class DB():
178    def __init__(self,mon,t='main'):
179        self._initialized=False
180        self.t=t
181        self.reconnect=0
182        self.empty = False
183        self.conn=None
184        self.mon=mon
185        self.q=queue.Queue()
186        self.processed=0
187        self.need_clean = False
188        printea('Database worker {} initialized'.format(t),'info')
189
190    def timed(func):
191        @wraps(func)
192        def wrapper(*args,**kwargs):
193            if TIMED_ON:
194                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
195            ret=func(*args,**kwargs)
196            if TIMED_ON:
197                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
198            return ret
199        return wrapper
200
201    def with_retry(func):
202        @wraps(func)
203        def wrapper(*args,**kwargs):
204            if 'retry' not in kwargs:
205                kwargs['retry']=1
206            if 'mon' not in kwargs:
207                kwargs['mon'] = None
208            try:
209                return func(*args,**kwargs) 
210            except Exception as e:
211                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
212                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
213                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
214                    if kwargs['mon']:
215                        kwargs['mon'].term()
216                        sys.exit(1)
217                    return None
218                else:
219                    time.sleep(kwargs['retry']**2)
220                    kwargs['retry']+=1
221                    return wrapper(*args,**kwargs)
222            return result
223        return wrapper
224
225    def with_debug(func):
226        @wraps(func)
227        def wrapper(*args,**kwargs):
228            if 'query' in kwargs:
229                printea("executing query: {}".format(kwargs['query']),'debug')
230            return func(*args,**kwargs)
231        return wrapper
232
233    @with_debug
234    def execute(self,*args,**kwargs):
235        if 'query' not in kwargs:
236            printea("Warning execute called whithout query",'info')
237            return None
238        try:
239            return self.cur.execute(kwargs['query'])
240        except mdb.OperationalError as e:
241                printea('({}) Operational error on mysql, error is: {}'.format(self.t,e),'error')
242                self.init_db()
243        except Exception as e:
244                raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
245
246    def init_db(self):
247        try:
248            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
249            self.conn.autocommit(False)
250            self.cur = self.conn.cursor()
251            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
252            variables=self.check_server_variables()
253
254            if variables:
255                self.mon.server_variables=variables
256
257            printea("Connected succesfully {} thread_id={}".format(self.t,self.conn.thread_id()),'info')
258
259        except mdb.Error as e:
260            printea("Error {}: {}".format(e.args[0],e.args[1]))
261            raise Exception(e)
262
263    def check_server_variables(self):
264        if self.t != 'main':
265            return None
266        else:
267            printea('Getting server variables','info')
268            result = {}
269            self.cur.execute("show global variables")
270            printea('Setting {} vars'.format(self.cur.rowcount),'debug')
271            if self.cur.rowcount > 0:
272                for i in range(self.cur.rowcount):
273                    var_name, var_value = self.cur.fetchone()
274                    result.setdefault(var_name,var_value)
275            return result
276
277    def get_config(self):
278        values={}
279        self.cur.execute('select name,value from Config')
280        for val in self.cur.fetchall():
281            values.setdefault(val[0],val[1])
282        return values
283
284    @with_retry
285    def put_config(self,*args,**kwargs):
286        values = kwargs.get('values')
287        vals=[]
288        for x in values.keys():
289            vals.append("('{}','{}')".format(x,values[x]))
290        try:
291            self.execute(query="insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals)))
292            self.conn.commit()
293        except Exception as e:
294            printea(e,'error')
295
296    @with_retry
297    def check_temporary_tables_size(self,*args,**kwargs):
298        table_name=kwargs.get('table_name')
299        if self.t != 'load':
300            return None
301        else:
302            size = 0
303            rows = 0
304            self.execute(query='select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
305
306            res=self.cur.fetchone()
307            size = float(res[0])
308            rows = int(res[1])
309            return (int(size),rows)
310
311    def close_db(self):
312        if self.conn:
313            self.conn.close()
314            printea("Closed connection {}".format(self.t),'info')
315
316    def reduce_flavour(self,version,flavour):
317        if version == '15':
318            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
319                return 'desktop'
320            elif 'server' in flavour:
321                return 'server'
322            elif 'client' in flavour:
323                return 'client' 
324        elif version == '16':
325            if 'server' in flavour:
326                return 'server'
327            elif 'client' in flavour:
328                return 'client'
329            elif 'desktop' in flavour:
330                return 'desktop'
331        return 'other'
332
333    def reduce_version(self,version):
334        if version[0:2] in ['15','16']:
335            return version[0:2]
336        else:
337            return 'other'
338
339    def gen_uuid(self,*args,**kwargs):
340        try:
341            string=to_utf(u'-'.join([str(x) for x in args]))
342            h=hashlib.sha1(string)
343            return int(h.hexdigest()[0:16],16)
344        except Exception as e:
345            printea(traceback.format_exc(),'critical')
346
347    @timed
348    @with_retry
349    def get_client(self,*args,**kwargs):
350        try:
351            query="SELECT id,date,user,version,sabor,arch,mem,vga,cpu,ncpu,ltsp,mode from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window))
352            self.execute(query=query)
353            ret =[]
354            if self.cur.rowcount > 0:
355                for i in range(self.cur.rowcount):
356                    v_id,v_date,v_user,v_version,v_flavour,v_arch,v_mem,v_vga,v_cpu,v_ncpu,v_ltsp,v_mode=self.cur.fetchone()
357                    version=self.reduce_version(v_version)
358                    flavour=self.reduce_flavour(version,v_flavour)
359                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
360                    if not v_arch:
361                        v_arch = 'NULL'
362                    if not v_mem:
363                        v_mem = 'NULL'
364                    if not v_vga:
365                        v_vga = 'NULL'
366                    if not v_cpu:
367                        v_cpu = 'NULL'
368                    if not v_ncpu:
369                        v_ncpu = 'NULL'
370                    if v_ltsp == '1' or v_ltsp == True:
371                        v_ltsp = 'TRUE'
372                    elif v_ltsp == '0' or v_ltsp == False:
373                        v_ltsp = 'FALSE'
374                    else:
375                        v_ltsp = 'NULL'
376                    if not v_mode:
377                        v_mode='NULL'
378
379                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour,'arch':v_arch,'mem':v_mem,'vga':v_vga,'cpu':v_cpu,'ncpu':v_ncpu,'ltsp':v_ltsp,'mode':v_mode})
380                return ret
381            else:
382                return True
383        except Exception as e:
384            raise Exception("Error getting client: {}".format(e))
385
386    @timed
387    @with_retry
388    def get_apps(self,*args,**kwargs):
389        if 'clients' not in kwargs:
390            printea("Warning executed without named parameter clients",'info')
391            return None
392        ret = {}
393        try:
394            cids_to_rel_fla={}
395            for c in kwargs['clients']:
396                if c['id'] not in cids_to_rel_fla:
397                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
398                if c['flavour'] not in ret:
399                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
400            for c in kwargs['clients']:
401                ret[c['flavour']]['clients'].append(c)
402
403            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
404            self.execute(query=query)
405            if self.cur.rowcount > 0:
406                for i in range(self.cur.rowcount):
407                    row = self.cur.fetchone()
408                    clid = row[0]
409                    rel,fla = cids_to_rel_fla[clid]
410                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
411                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
412            return ret
413        except Exception as e:
414            raise Exception("Error getting apps: {}".format(e))
415
416    @timed
417    @with_retry
418    def put_client(self,*args,**kwargs):
419        if 'client_list' not in kwargs:
420            raise Exception("Called without named parameter client_list")
421        values= []
422        for cli in kwargs['client_list']:
423            values.append("({},'{}','{}','{}','{}','{}','{}','{}',{},'{}','{}',{},{},'{}')".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour'],cli['arch'],cli['mem'],cli['vga'],cli['cpu'],cli['ncpu'],cli['ltsp'],cli['mode']))
424        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`,`arch`,`mem`,`vga`,`cpu`,`ncpu`,`ltsp`,`mode`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
425        self.execute(query=query)
426        return True
427
428    @timed
429    @with_retry
430    def put_apps(self,*args,**kwargs):
431        if 'apps' not in kwargs:
432            raise Exception("Called without named parameter apps")
433        app_list = {}
434        for app in kwargs['apps']:
435            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
436            if str(app['uuid']) not in app_list:
437                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
438            else:
439                app_list[str(app['uuid'])]['value']+=app['value']
440        values = []
441        for app in app_list:
442            item=app_list[app]
443            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
444        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
445        self.execute(query=query)
446        return True
447
448    @timed
449    @with_retry
450    def del_client(self,*args,**kwargs):
451        if 'client_list' not in kwargs:
452            raise Exception("Called without named parameter client_list")
453        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
454        self.execute(query=query)
455        return True
456
457    @timed
458    @with_retry
459    def del_apps(self,*args,**kwargs):
460        if 'client_list' not in kwargs:
461            raise Exception("Called without named parameter client_list")
462        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
463        self.execute(query=query)
464        return True
465
466    @timed
467    def reset_autoinc(self):
468        try:
469            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME)
470            self.execute(query=query)
471            ainc = self.cur.fetchone()[0]
472            #query = "SELECT count(*) from tmp_clients"
473            #self.execute(query=query)
474            #cli_size=self.cur.fetchone()[0]
475            #query = "SELECT count(*) from tmp_packages"
476            #self.execute(query=query)
477            #pkg_size=self.cur.fetchone()[0]
478            #printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
479            if ainc > 65500 and self.mon.all_queues_empty():
480            #and cli_size == 0 and pkg_size == 0:
481                query = "TRUNCATE TABLE tmp_clients"
482                self.execute(query=query)
483                query = "TRUNCATE TABLE tmp_packages"
484                self.execute(query=query)
485            return True
486        except Exception as e:
487            raise Exception("Error reseting auto_increment: {}".format(e))
488
489
490    @timed
491    def process_main_thread(self,*args,**kwargs):
492        if self.mon.slow_check_clients > 0 :
493            self.mon.slow_check_clients -= 1
494            time.sleep(1)
495            return True
496        else:
497            self.mon.slow_check_clients = self.mon.default_slow_check_clients
498
499        clis=self.get_client(mon=self.mon)
500        if clis == True: #No clients found (empty)
501            self.empty = True # if > 65500 auto_increment was reset
502            self.mon.schedule(event='NO_MORE_CLIENTS')
503            return False
504        #clients found
505        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
506        # if clients found get apps
507        lapps = self.get_apps(clients=clis,mon=self.mon)
508        #lapps can be empty
509        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
510        #If deletion was failed , thread was died
511        lapps_tmp={'apps':[],'clients':[]}
512        for fla in lapps:
513            if THREADED:
514                lapps[fla]['timestamp']=time.time()
515                self.mon.db[fla].q.put(lapps[fla],True)
516            else:
517                lapps_tmp['apps'].extend(lapps[fla]['apps'])
518                lapps_tmp['clients'].extend(lapps[fla]['clients'])
519                self.mon.db['main'].q.put(lapps_tmp,True)
520        #if DEBUG:
521        self.processed+=len(clis)
522        return True
523
524    @timed
525    def process_all_threads(self,*args,**kwargs):
526        lapps=self.q.get(True)
527        #print "Running {}".format(self.t)
528        if THREADED:
529            while (lapps['timestamp'] > self.mon.commited):
530                time.sleep(0.001)
531        if len(lapps['clients']) != 0:
532            printea('Thread {} putting client'.format(self.t),'debug')
533            #IF FAIL, AFTER RETRIES THREAD DIES
534            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
535                self.q.put(lapps,True) # USELESS
536                return False    # USELESS
537        if len(lapps['apps']) != 0:
538            printea('Thread {} putting clientapps'.format(self.t),'debug')
539            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
540                self.q.put(lapps,True) # USELESS
541                return False    # USELESS
542            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
543                self.q.put(lapps,True) # USELESS
544                return False    # USELESS
545        #if DEBUG:
546        self.processed+=len(lapps['clients'])
547        return True
548
549    def process(self,*args,**kwargs):
550        keepalive(self.t)
551        # warning too much verbose, printea("Running thread {}".format(self.t),'debug')
552        if self.t == 'main' and not self.mon.terminate:
553            ret=self.process_main_thread(*args,**kwargs)
554            if ret == False: #No more clients available
555                return True
556            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
557                if THREADED:
558                    return True #Need return to avoid put empty flag and wait into main thread
559
560        #after this poing, code for all threads
561        if not self.q.empty():
562            ret=self.process_all_threads(*args,**kwargs)
563            if ret == False: # USELESS? THREADS was died
564                printea("Error threads")
565                return ret
566        else: 
567            del self.q
568            self.q = queue.Queue()
569            self.empty = True
570        return True
571
572    def worker(self):
573        printea("Starting worker {} processing".format(self.t),'info')
574        while not (self.mon.terminate and self.empty):
575            if self.mon.paused or self.empty:
576                #if self.empty and self.t == 'main':
577                #    self.reset_autoinc()
578                if self.mon.paused:
579                    printea("Paused by high load {}".format(self.t),'debug')
580                # Too much verbose
581                #if self.empty:
582                #    printea("Empty queue {} sleeping by now".format(self.t),'debug')
583                if self.empty:
584                    self.empty = False
585                time.sleep(EMPTY_PAUSED_SLEEP)
586            else:
587                try:
588                    if self.conn == None:
589                        if not self._initialized:
590                            self._initialized = True
591                            raise EnvironmentError('Initializing connector {}'.format(self.t))
592                        else:
593                            raise Warning('Connection not available')
594                    else:
595                        self.conn.begin()
596                    if self.process():
597                        self.conn.commit()
598                        self.mon.commited=time.time()
599                        self.reconnect = 0
600                        if self.need_clean:
601                            gc.collect()
602                    else:
603                        self.conn.rollback()
604                except EnvironmentError as e:
605                    printea(e,'info')
606                    self.init_db()
607                except Warning as e:
608                    printea(e,'warning')
609                    self.init_db()
610                except Exception as e:
611                    try:
612                        if self.conn != None:
613                            self.conn.rollback()
614                    except:
615                        printea("Can't rollback last actions",'info')
616                        pass
617                    #if e[0] != 2006:
618                    #    printea("Exception processing worker({}): {}".format(self.t,e))
619                    #if e[0] == 2006: # SERVER GONE AWAY
620                    #    printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))
621
622                    printea("Trying to recover connection ({}), {}".format(self.t,e))
623                    if self.reconnect == 100:
624                        printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t))
625                        self.mon.term()
626                    else:
627                        self.reconnect+=1
628                        printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
629                        time.sleep(self.reconnect*self.reconnect)
630
631                        try:
632                            self.init_db()
633                            printea("Recovered worker {} connection".format(self.t),'info')
634                        except:
635                            printea('Unable to initialize worker {}'.format(self.t))
636                            pass
637
638
639class Monitor():
640    def __init__(self):
641        self.MAX_QUEUE_UTILIZATION = 100
642        self.USE_MAX_QUEUES = True
643        self.MAX_SELECT_WINDOW = (2 ** 13) +1
644        self.MIN_SELECT_WINDOW = 32
645        self.MEM_USED=0
646        self.MAX_MEM=512
647        self.MIN_FREE_MEM_SERVER=100
648        self.USE_MAX_MEM=True
649        self.lock = Lock()
650        self.terminate = False
651        self.finished = False
652        self.paused = False
653        self.select_window = self.MIN_SELECT_WINDOW
654        self.commited = time.time()
655        self.procesed = 0
656        self.procesed_per_sec = [0]*10
657        self.load = 0
658        self.server_variables = None  # initialized by main worker
659        self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
660        self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
661        self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
662        self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
663        self.default_slow_check_clients = 1
664        self.slow_check_clients = self.default_slow_check_clients
665        self.server_mem = 0
666        self.loadlist = [ 0.0 ] * 100
667        self.max_heap = None
668        self.last_events = []
669
670        signal.signal(signal.SIGQUIT,self.term)
671        signal.signal(signal.SIGTERM,self.term)
672        signal.signal(signal.SIGINT,self.term)
673
674        self.db = {}
675        self.threads = {}
676        for x in THREADS:
677            self.db[x] = DB(self,x)
678            #try:
679            #    self.db[x].init_db()
680            #except Exception as e:
681            #    printea('Error initializing database connections: {}'.format(str(e)))
682            #    sys.exit(1)
683        self.cfg= None
684        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
685
686    def windowctl(self, *args, **kwargs):
687        if args[0] == '+':
688            if self.select_window*2 < self.MAX_SELECT_WINDOW:
689                self.select_window*=2
690                self.select_window=int(self.select_window)
691        if args[0] == '-':
692            if self.select_window > self.MIN_SELECT_WINDOW:
693                self.select_window/=2
694                self.select_window=int(self.select_window)
695
696    def slowcheckctl(self, *args, **kwargs):
697        if args[0] == 'reset':
698            self.default_slow_check_clients = 0
699        if args[0] == '+':
700            if self.default_slow_check_clients < MAX_SLOW_CHECK_CLIENTS:
701                if self.default_slow_check_clients == 0:
702                    self.default_slow_check_clients = 1
703                else:
704                    self.default_slow_check_clients = self.default_slow_check_clients * 2
705        if args[0] == '-':
706            if self.default_slow_check_clients > 0:
707                self.default_slow_check_clients = self.default_slow_check_clients / 2
708
709    def append_event_log(self,*args,**kwargs):
710        if DEBUG:
711            if USE_FILE_EVENT_LOG:
712                try:
713                    fp = open(FILE_EVENT_LOG,'a')
714                except:
715                    fp = False
716            else:
717                fp = False
718            separator=':'
719            for x in args:
720                the_time=str(int(time.time()))
721                the_value=str(x)
722                the_string=the_time+separator+the_value
723                self.last_events.append(the_string)
724                if fp:
725                    fp.write('{}\n'.format(the_string))
726            if fp:
727                fp.close()
728
729    def schedule(self, *args, **kwargs):
730        if kwargs['event'] == 'NO_MORE_CLIENTS':
731            self.append_event_log(kwargs['event'])
732            self.windowctl('-')
733            self.slowcheckctl('+')
734
735        if kwargs['event'] == 'HAVE_CLIENTS':
736            self.append_event_log(kwargs['event'])
737            if kwargs['nclients'] == self.select_window:
738                self.windowctl('+')
739            else:
740                self.windowctl('-')
741            self.slowcheckctl('reset')
742#
743#                if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
744#                    self.windowctl('+')
745#
746#                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
747#                    self.windowctl('-')
748#                    self.slowcheckctl('+')
749#                else:
750#                    self.windowctl('+')
751#
752#            elif kwargs['nclients'] < self.select_window/2:
753#                self.windowctl('-')
754#
755#            self.slowcheckctl('reset')
756
757        if kwargs['event']=='CHECK_SANITY':
758            # CPU SETTINGS
759            if not self.terminate and self.load > LIMIT:
760                self.append_event_log('LOAD_LIMIT_REACHED')
761                self.paused = True
762            else:
763                self.paused = False
764
765            # DB MEM SETTINGS
766            if self.max_heap and self.temporary_tables_size['sum']:
767                if self.temporary_tables_size['sum'] > self.max_heap * 0.3:
768                    self.append_event_log('MAX_HEAP_ALERT')
769                    self.windowctl('+')
770                    self.slowcheckctl('-')
771                    if self.paused:
772                        #printea('Hitting max temporary table size unpausing','critical')
773                        self.paused = False
774            # SERVER MEM
775            if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER:
776                self.append_event_log('SERVER_MEM_ALERT')
777                #printea('Hitting max memory from server collecting and reducing window','critical')
778                self.windowctl('-')
779                self.slowcheckctl('+')
780                self.USE_MAX_QUEUES=True
781                for x in THREADS:
782                    self.db[x].need_clean=True
783                gc.collect()
784            else:
785#                self.USE_MAX_QUEUES=False
786                for x in THREADS:
787                    self.db[x].need_clean=False
788
789            if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
790                self.append_event_log('MAX_MEM_ALERT')
791                self.windowctl('-')
792                self.slowcheckctl('+')
793
794            # QUEUES
795            if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
796                self.append_event_log('MAX_QUEUES_REACHED')
797                self.windowctl('+')
798
799
800    def get_cpu_load(self):
801        self.loadlist.append(psutil.cpu_percent())
802        self.loadlist=self.loadlist[1:]
803        avg=0.0
804        for x in self.loadlist:
805            avg+=x
806        return round(avg/100.0,2)
807
808    def term(self,*args,**kwargs):
809        printea("Begin kill the program, wait please...",'info')
810        self.terminate=True
811
812    def prepare_threads(self):
813        global DAEMON
814
815        self.threads['load']=Process(target=self.get_load)
816        self.threads['load'].daemon = DAEMON
817        self.threads['load'].start()
818        for x in THREADS:
819            self.threads[x]=Process(target=self.db[x].worker)
820            self.threads[x].daemon = DAEMON
821            self.threads[x].start()
822
823    def get_mem_usage(self):
824        process = psutil.Process(os.getpid())
825        mem = process.memory_info()[0] / float(2 ** 20)
826        return mem
827
828    def get_server_free_mem(self):
829        mem = psutil.virtual_memory()
830        return mem.free / (2 ** 20)
831
832    def print_stats(self):
833        global CHECK_LOAD_TIME
834#        if DEBUG:
835        if True:
836            out="Processed: "
837            out2=''
838            total=0
839            for x in THREADS:
840                out2+='{}={} '.format(x,self.db[x].processed)
841                self.cfg.store('processed '+x,self.db[x].processed)
842                if THREADED:
843                    if x != 'main':
844                        total += self.db[x].processed
845                else:
846                    total += self.db[x].processed
847            out += "TOTAL={} {}".format(total,out2)
848            self.cfg.store('processed total',total)
849            proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME)
850            self.procesed_per_sec.append(proc_per_sec)
851            self.procesed_per_sec=self.procesed_per_sec[-10:]
852            f=lambda x,y:x+y
853            suma_proc=reduce(f,self.procesed_per_sec)
854            self.cfg.store('processing at',int(suma_proc/10))
855            self.procesed=total
856            total = 0
857            if THREADED:
858                out+="Queues: "
859                sizes=self.get_q_sizes()
860                out2=''
861                for x in sizes:
862                    self.cfg.store('queue '+x,sizes[x])
863                    out2+='{}={} '.format(x,sizes[x])
864                    total += sizes[x]
865                self.cfg.store('queue totals',total)
866                out+="TOTAL={} {}".format(total,out2)
867            self.cfg.store('select window',self.select_window)
868            self.cfg.store('mem used',self.MEM_USED)
869            self.cfg.store('load',self.load)
870            if (self.paused):
871                self.cfg.store('paused',1)
872            else:
873                self.cfg.store('paused',0)
874            self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients']))
875            self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages']))
876            self.cfg.store('temp_clients_rows',int(self.temporary_tables_rows['clients']))
877            self.cfg.store('temp_packages_rows',int(self.temporary_tables_rows['packages']))
878            self.cfg.store('db_clients_size',int(self.db_tables_size['clients']))
879            self.cfg.store('db_packages_size',int(self.db_tables_size['packages']))
880            self.cfg.store('default_slow_check_clients',int(self.default_slow_check_clients))
881            self.cfg.store('slow_check_clients',int(self.slow_check_clients))
882            if DEBUG:
883                if USE_FILE_EVENT_DB:
884                    max_event=len(self.last_events)
885                    if max_event > 20:
886                        max_event = 20
887                    for i in range(0,max_event):
888                        self.cfg.store('event{:02d}'.format(i),self.last_events[i])
889                self.last_events=self.last_events[-20:]
890                if THREADED:
891                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
892                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
893                else:
894                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
895                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
896
897    def get_q_sizes(self):
898        sizes={}
899        for x in THREADS:
900            sizes[x]=self.db[x].q.qsize()
901        return sizes
902
903    def sum_q_sizes(self):
904        sizes=self.get_q_sizes()
905        f=lambda x,y: x+y
906        return reduce(f,[sizes[x] for x in sizes])
907
908    def all_queues_empty(self):
909        sizes=self.get_q_sizes()
910        for x in sizes:
911            if sizes[x] != 0:
912                return False
913        return True
914
915    def get_load(self):
916        # runs on separate thread
917        db = DB(self,'load')
918        db.init_db()
919        self.cfg = Config(db)
920        ctime=0
921        while not (self.terminate and self.finished): #and not self.paused
922            self.cfg.store('keepalive',int(time.time()))
923            time.sleep(1)
924            self.load=self.get_cpu_load()
925            ctime+=1
926            self.schedule(event='CHECK_SANITY')
927            if ctime >CHECK_LOAD_TIME:
928                self.cfg.write()
929                ctime=0.0
930
931                self.MEM_USED=self.get_mem_usage()
932                self.server_mem=self.get_server_free_mem()
933
934                if self.server_variables and 'max_heap_table_size' in self.server_variables:
935                    self.max_heap=int(self.server_variables['max_heap_table_size'])/(2**20)
936                    if self.max_heap < WARNING_MIN_HEAP:
937                        printea("******************** MIN HEAP IS TOO SMALL -> {} < {} ********************".format(self.max_heap,WARNING_MIN_HEAP),'warning')
938
939                self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='tmp_clients')
940                self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='tmp_packages')
941                self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='Client_Versions')
942                self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='RecvPackages')
943                self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages']
944                self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages']
945                self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages']
946                self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages']
947                qempty = self.all_queues_empty()
948                if self.temporary_tables_rows['clients'] == 0 and self.temporary_tables_rows['packages'] == 0 and qempty:
949                    db.reset_autoinc()
950                self.print_stats()
951                #self.load=os.getloadavg()[0]
952                if qempty:
953                    gc.collect()
954            #end if
955        #end while
956        db.close_db()
957
958    def start(self):
959        self.prepare_threads()
960
961    def end(self):
962        for x in self.db:
963            self.threads[x].join()
964            self.db[x].close_db()
965        self.finished = True
966        self.print_stats()
967
968
969class Config():
970    def __init__(self,connection):
971        self._db = connection
972        self.read()
973
974    def store(self,var,value):
975        var=var.replace(' ','_')
976        if isinstance(value,str):
977            if value.isnumeric():
978                setattr(self,var,int(value))
979            else:
980                setattr(self,var,str(value))
981        else:
982            setattr(self,var,int(value))
983
984    def write(self):
985        values={}
986        for x in self.get_internal_vars():
987            values.setdefault(str(x)[:20],str(getattr(self,x))[:45])
988        if self._db:
989            self._db.put_config(values=values)
990        the_string="CONFIGURATION\n"
991        if USE_CONFIG_FILE_LOG:
992            fnbuild = lambda kv : str(kv[0])+'='+str(kv[1])+"\n"
993            the_string+=reduce((lambda x,y: x+y) ,map(fnbuild,values.items()))
994            try:
995                with open(CONFIG_FILE_LOG,'w') as fp:
996                    fp.write(the_string)
997            except:
998                pass
999
1000    def read(self):
1001        if self._db:
1002            config=self._db.get_config()
1003            for key in config.keys():
1004                if config[key].isnumeric():
1005                    setattr(self,key,int(config[key]))
1006                else:
1007                    setattr(self,key,config[key])
1008        else:
1009            printea('No config yet')
1010
1011    def get_internal_vars(self):
1012        return list(filter(lambda x : x[0] != '_',self.__dict__.keys()))
1013
1014    def print(self):
1015        for v in self.get_internal_vars():
1016            print('{} = {}'.format(v,getattr(self,v)))
1017
1018
1019def main(*args,**kwargs):
1020    gc.enable()
1021    if DAEMON:
1022        fp = open('/var/run/analyticsd.pid','w');
1023        fp.write(str(os.getpid()))
1024        fp.close()
1025    m = Monitor()
1026    m.start()
1027    printea("start done",'info')
1028    while not m.terminate:
1029        time.sleep(0.5)
1030    m.end()
1031    printea("Exitting...",'info')
1032
1033if __name__ == "__main__":
1034    exit = 0
1035    keyword='analyticsd'
1036    interpreter='python3'
1037    for proc in psutil.process_iter():
1038        a=False
1039        b=False
1040        for argument in proc.cmdline():
1041            #print('{} {} {}'.format(proc.cmdline,keyword,argument[-len(keyword):]))
1042            if interpreter in argument[-len(interpreter):]:
1043                a = True
1044            if keyword in argument[-len(keyword):]:
1045                b = True
1046            if a and b:
1047                exit = exit +1
1048    if exit > 1:
1049        printea('Another daemon is running','error')
1050        sys.exit(1)
1051
1052    lck = '/var/run/analyticsd'
1053    if DAEMON:
1054        if os.path.isfile(lck+'.lock'):
1055            printea('Lockfile {} detected, unable to start'.format(lck),'error')
1056            sys.exit(1)
1057        else:
1058            try:
1059                with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock(lck),files_preserve=[logger.handlers[0].socket.fileno()]):
1060                    main()
1061            except Exception as e:
1062                printea(e)
1063                sys.exit(1)
1064    else:
1065        main()
Note: See TracBrowser for help on using the repository browser.