source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 6812

Last change on this file since 6812 was 6812, checked in by mabarracus, 19 months ago

Some optimizations

  • Property svn:executable set to *
File size: 35.8 KB
Line 
1#!/usr/bin/env python3
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7#import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21import logging
22import logging.handlers
23from logging import config
24from functools import reduce
25import traceback
26
27##### START EDITABLE VARS #####
28
29DEBUG=False
30MIN_LOG_LEVEL=logging.INFO
31DAEMON=True
32FILE='/usr/lib/analytics-server/analytics/config.php'
33LIMIT=70.0       # PERCENTAGE LOAD LIMIT
34
35##### END EDITABLE VARS #####
36
37DEBUG_PRINT_ALIVE_COUNTER=60
38THREADED=True
39MAX_SLOW_CHECK_CLIENTS = 30
40
41if DEBUG:
42    if MIN_LOG_LEVEL:
43        loglevel=MIN_LOG_LEVEL
44    else:
45        loglevel=logging.DEBUG
46else:
47    loglevel=logging.INFO
48
49LOGGING = {
50    'version': 1,
51    'disable_existing_loggers': False,
52    'formatters': {
53        'verbose': {
54            'format': '%(levelname)s %(module)s %(message)s'
55            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
56            },
57        },
58    'handlers': {
59        'stdout': {
60            'class': 'logging.StreamHandler',
61            'stream': sys.stdout,
62            'formatter': 'verbose',
63            },
64        'sys-logger6': {
65            'class': 'logging.handlers.SysLogHandler',
66            'address': '/dev/log',
67            'facility': "local6",
68            'formatter': 'verbose',
69            },
70        },
71    'loggers': {
72        'analyticsd-logger': {
73            'handlers': ['sys-logger6','stdout'],
74            'level': loglevel,
75            'propagate': True,
76            },
77        }
78    }
79
80def to_str(x):
81    if isinstance(x,str):
82        return to_utf(x).decode('unicode_escape')
83    else:
84        return x
85
86def to_utf(x):
87    if isinstance(x,str):
88        return x.encode('utf-8')
89    else:
90        return x
91
92def printea(msg="",level='critical'):
93    if level == 'critical':
94        logger.critical(msg)
95    elif level == 'error':
96        logger.error(msg)
97    elif level == 'warning':
98        logger.warning(msg)
99    elif level == 'info':
100        logger.info(msg)
101    else:
102        logger.debug(msg)
103
104def keepalive(who=""):
105    global DEBUG_PRINT_ALIVE_COUNTER
106    t=str(int(time.time()))
107    if DEBUG:
108        printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
109    else:
110        if who == 'main':
111            if DEBUG_PRINT_ALIVE_COUNTER > 0:
112                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
113            else:
114                DEBUG_PRINT_ALIVE_COUNTER=60
115                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
116
117    fp = open('/var/run/analyticsd.keepalive','w');
118    fp.write(t)
119    fp.close()
120
121config.dictConfig(LOGGING)
122logger = logging.getLogger('analyticsd-logger')
123
124DBNAME=None
125USER=None
126PASS=None
127IP=None
128
129EMPTY_PAUSED_SLEEP=1
130CHECK_LOAD_TIME=5
131MAX_RETRIES=5
132TIMED_ON=False
133
134try:
135    with open(FILE,'r') as f:
136        for line in f:
137            if not IP:
138                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
139                if IP:
140                    IP = IP.group(1)
141            if not DBNAME:
142                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
143                if DBNAME:
144                    DBNAME = DBNAME.group(1)
145            if not USER:
146                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
147                if USER:
148                    USER = USER.group(1)
149            if not PASS:
150                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
151                if PASS:
152                    PASS = PASS.group(1)
153    if not (IP or DBNAME or USER or PASS):
154        printea("Couldn't get database configuration from {}".format(FILE))
155    else:
156        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
157except Exception as e:
158    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
159    sys.exit(1)
160
161if not (IP or DBNAME or USER or PASS):
162    printea("Couldn't get database configuration from {}".format(FILE))
163    sys.exit(1)
164
165if THREADED:
166    THREADS=['main','server','client','desktop','other']
167else:
168    THREADS=['main']
169
170class DB():
171    def __init__(self,mon,t='main'):
172        self.t=t
173        self.reconnect=0
174        self.empty = False
175        self.conn=None
176        self.mon=mon
177        self.q=queue.Queue()
178        self.processed=0
179        self.need_clean = False
180        printea('Database worker {} initialized'.format(t),'info')
181
182    def timed(func):
183        @wraps(func)
184        def wrapper(*args,**kwargs):
185            if TIMED_ON:
186                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
187            ret=func(*args,**kwargs)
188            if TIMED_ON:
189                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
190            return ret
191        return wrapper
192
193    def with_retry(func):
194        @wraps(func)
195        def wrapper(*args,**kwargs):
196            if 'retry' not in kwargs:
197                kwargs['retry']=1
198            if 'mon' not in kwargs:
199                kwargs['mon'] = None
200            try:
201                return func(*args,**kwargs) 
202            except Exception as e:
203                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
204                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
205                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
206                    if kwargs['mon']:
207                        kwargs['mon'].term()
208                        sys.exit(1)
209                    return None
210                else:
211                    time.sleep(kwargs['retry']**2)
212                    kwargs['retry']+=1
213                    return wrapper(*args,**kwargs)
214            return result
215        return wrapper
216
217    def with_debug(func):
218        @wraps(func)
219        def wrapper(*args,**kwargs):
220            if 'query' in kwargs:
221                printea("executing query: {}".format(kwargs['query']),'debug')
222            return func(*args,**kwargs)
223        return wrapper
224
225    @with_debug
226    def execute(self,*args,**kwargs):
227        if 'query' not in kwargs:
228            printea("Warning execute called whithout query",'info')
229            return None
230        try:
231            return self.cur.execute(kwargs['query'])
232        except Exception as e:
233            raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
234
235    def init_db(self):
236        try:
237            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
238            self.conn.autocommit(False)
239            self.cur = self.conn.cursor()
240            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
241            variables=self.check_server_variables()
242
243            if variables:
244                self.mon.server_variables=variables
245
246            printea("Connected succesfully {}".format(self.t),'info')
247
248        except mdb.Error as e:
249            printea("Error {}: {}".format(e.args[0],e.args[1]))
250            raise Exception(e)
251
252    def check_server_variables(self):
253        if self.t != 'main':
254            return None
255        else:
256            printea('Getting server variables','info')
257            result = {}
258            self.cur.execute("show global variables")
259            printea('Setting {} vars'.format(self.cur.rowcount),'debug')
260            if self.cur.rowcount > 0:
261                for i in range(self.cur.rowcount):
262                    var_name, var_value = self.cur.fetchone()
263                    result.setdefault(var_name,var_value)
264            return result
265
266    def get_config(self):
267        values={}
268        self.cur.execute('select name,value from Config')
269        for val in self.cur.fetchall():
270            values.setdefault(val[0],val[1])
271        return values
272
273    def put_config(self,values):
274        vals=[]
275        for x in values.keys():
276            vals.append("('{}','{}')".format(x,values[x]))
277        try:
278            self.cur.execute("insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals)))
279            self.conn.commit()
280        except Exception as e:
281            print(e)
282
283    def check_temporary_tables_size(self,table_name):
284        if self.t != 'load':
285            return None
286        else:
287            size = 0
288            rows = 0
289            self.cur.execute('select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
290
291            res=self.cur.fetchone()
292            size = float(res[0])
293            rows = int(res[1])
294            return (int(size),rows)
295
296    def close_db(self):
297        if self.conn:
298            self.conn.close()
299            printea("Closed connection {}".format(self.t),'info')
300
301    def reduce_flavour(self,version,flavour):
302        if version == '15':
303            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
304                return 'desktop'
305            elif 'server' in flavour:
306                return 'server'
307            elif 'client' in flavour:
308                return 'client' 
309        elif version == '16':
310            if 'server' in flavour:
311                return 'server'
312            elif 'client' in flavour:
313                return 'client'
314            elif 'desktop' in flavour:
315                return 'desktop'
316        return 'other'
317
318    def reduce_version(self,version):
319        if version[0:2] in ['15','16']:
320            return version[0:2]
321        else:
322            return 'other'
323
324    def gen_uuid(self,*args,**kwargs):
325        try:
326            string=to_utf(u'-'.join([str(x) for x in args]))
327            h=hashlib.sha1(string)
328            return int(h.hexdigest()[0:16],16)
329        except Exception as e:
330            print(traceback.format_exc())
331
332    @timed
333    @with_retry
334    def get_client(self,*args,**kwargs):
335        try:
336            query="SELECT id,date,user,version,sabor from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window))
337            self.execute(query=query)
338            ret =[]
339            if self.cur.rowcount > 0:
340                for i in range(self.cur.rowcount):
341                    v_id,v_date,v_user,v_version,v_flavour=self.cur.fetchone()
342                    version=self.reduce_version(v_version)
343                    flavour=self.reduce_flavour(version,v_flavour)
344                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
345                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour})
346                return ret
347            else:
348                return True
349        except Exception as e:
350            raise Exception("Error getting client: {}".format(e))
351
352    @timed
353    @with_retry
354    def get_apps(self,*args,**kwargs):
355        if 'clients' not in kwargs:
356            printea("Warning executed without named parameter clients",'info')
357            return None
358        ret = {}
359        try:
360            cids_to_rel_fla={}
361            for c in kwargs['clients']:
362                if c['id'] not in cids_to_rel_fla:
363                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
364                if c['flavour'] not in ret:
365                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
366            for c in kwargs['clients']:
367                ret[c['flavour']]['clients'].append(c)
368
369            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
370            self.execute(query=query)
371            if self.cur.rowcount > 0:
372                for i in range(self.cur.rowcount):
373                    row = self.cur.fetchone()
374                    clid = row[0]
375                    rel,fla = cids_to_rel_fla[clid]
376                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
377                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
378            return ret
379        except Exception as e:
380            raise Exception("Error getting apps: {}".format(e))
381
382    @timed
383    @with_retry
384    def put_client(self,*args,**kwargs):
385        if 'client_list' not in kwargs:
386            raise Exception("Called without named parameter client_list")
387        values= []
388        for cli in kwargs['client_list']:
389            values.append("({},'{}','{}','{}','{}','{}','{}')".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour']))
390        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
391        self.execute(query=query)
392        return True
393
394    @timed
395    @with_retry
396    def put_apps(self,*args,**kwargs):
397        if 'apps' not in kwargs:
398            raise Exception("Called without named parameter apps")
399        app_list = {}
400        for app in kwargs['apps']:
401            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
402            if str(app['uuid']) not in app_list:
403                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
404            else:
405                app_list[str(app['uuid'])]['value']+=app['value']
406        values = []
407        for app in app_list:
408            item=app_list[app]
409            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
410        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
411        self.execute(query=query)
412        return True
413
414    @timed
415    @with_retry
416    def del_client(self,*args,**kwargs):
417        if 'client_list' not in kwargs:
418            raise Exception("Called without named parameter client_list")
419        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
420        self.execute(query=query)
421        return True
422
423    @timed
424    @with_retry
425    def del_apps(self,*args,**kwargs):
426        if 'client_list' not in kwargs:
427            raise Exception("Called without named parameter client_list")
428        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
429        self.execute(query=query)
430        return True
431
432    @timed
433    def reset_autoinc(self):
434        try:
435            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME)
436            self.execute(query=query)
437            ainc = self.cur.fetchone()[0]
438            #query = "SELECT count(*) from tmp_clients"
439            #self.execute(query=query)
440            #cli_size=self.cur.fetchone()[0]
441            #query = "SELECT count(*) from tmp_packages"
442            #self.execute(query=query)
443            #pkg_size=self.cur.fetchone()[0]
444            #printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
445            if ainc > 65500 and self.mon.all_queues_empty():
446            #and cli_size == 0 and pkg_size == 0:
447                query = "TRUNCATE TABLE tmp_clients"
448                self.execute(query=query)
449                query = "TRUNCATE TABLE tmp_packages"
450                self.execute(query=query)
451            return True
452        except Exception as e:
453            raise Exception("Error reseting auto_increment: {}".format(e))
454
455
456    @timed
457    def process_main_thread(self,*args,**kwargs):
458        if self.mon.slow_check_clients > 0 :
459            self.mon.slow_check_clients -= 1
460            time.sleep(1)
461            return True
462        else:
463            self.mon.slow_check_clients = self.mon.default_slow_check_clients
464
465        clis=self.get_client(mon=self.mon)
466        if clis == True: #No clients found (empty)
467            self.empty = True # if > 65500 auto_increment was reset
468            self.mon.schedule(event='NO_MORE_CLIENTS')
469            return False
470        #clients found
471        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
472        # if clients found get apps
473        lapps = self.get_apps(clients=clis,mon=self.mon)
474        #lapps can be empty
475        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
476        #If deletion was failed , thread was died
477        lapps_tmp={'apps':[],'clients':[]}
478        for fla in lapps:
479            if THREADED:
480                lapps[fla]['timestamp']=time.time()
481                self.mon.db[fla].q.put(lapps[fla],True)
482            else:
483                lapps_tmp['apps'].extend(lapps[fla]['apps'])
484                lapps_tmp['clients'].extend(lapps[fla]['clients'])
485                self.mon.db['main'].q.put(lapps_tmp,True)
486        #if DEBUG:
487        self.processed+=len(clis)
488        return True
489
490    @timed
491    def process_all_threads(self,*args,**kwargs):
492        lapps=self.q.get(True)
493        #print "Running {}".format(self.t)
494        if THREADED:
495            while (lapps['timestamp'] > self.mon.commited):
496                time.sleep(0.001)
497        if len(lapps['apps']) != 0 and len(lapps['clients']) != 0:
498            #IF FAIL, AFTER RETRIES THREAD DIES
499            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
500                self.q.put(lapps,True) # USELESS
501                return False    # USELESS
502            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
503                self.q.put(lapps,True) # USELESS
504                return False    # USELESS
505            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
506                self.q.put(lapps,True) # USELESS
507                return False    # USELESS
508        #if DEBUG:
509        self.processed+=len(lapps['clients'])
510        return True
511
512    def process(self,*args,**kwargs):
513        keepalive(self.t)
514        # code only for main thread
515        printea("Running thread {}".format(self.t),'debug')
516        if self.t == 'main' and not self.mon.terminate:
517            ret=self.process_main_thread(*args,**kwargs)
518            if ret == False: #No more clients available
519                return True
520            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
521                if THREADED:
522                    return True #Need return to avoid put empty flag and wait into main thread
523
524        #after this poing, code for all threads
525        if not self.q.empty():
526            ret=self.process_all_threads(*args,**kwargs)
527            if ret == False: # USELESS? THREADS was died
528                printea("Error threads")
529                return ret
530        else: 
531            del self.q
532            self.q = queue.Queue()
533            self.empty = True
534        return True
535
536    def worker(self):
537        printea("Starting worker {} processing".format(self.t),'info')
538        while not (self.mon.terminate and self.empty):
539            if self.mon.paused or self.empty:
540                #if self.empty and self.t == 'main':
541                #    self.reset_autoinc()
542                if self.mon.paused:
543                    printea("Paused by high load {}".format(self.t),'debug')
544                if self.empty:
545                    printea("Empty queue {} sleeping by now".format(self.t),'debug')
546                if self.empty:
547                    self.empty = False
548                time.sleep(EMPTY_PAUSED_SLEEP)
549            else:
550                try:
551                    if self.conn == None:
552                        raise EnvironmentError('Connection not available, probably it\'s a first run')
553                    else:
554                        self.conn.begin()
555                    if self.process():
556                        self.conn.commit()
557                        self.mon.commited=time.time()
558                        self.reconnect = 0
559                        if self.need_clean:
560                            gc.collect()
561                    else:
562                        self.conn.rollback()
563                except EnvironmentError as e:
564                    printea(e)
565                    self.init_db()
566
567                except Exception as e:
568                    try:
569                        if self.conn != None:
570                            self.conn.rollback()
571                    except:
572                        printea("Can't rollback last actions",'info')
573                        pass
574                    #if e[0] != 2006:
575                    #    printea("Exception processing worker({}): {}".format(self.t,e))
576                    #if e[0] == 2006: # SERVER GONE AWAY
577                    #    printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))
578
579                    printea("Trying to recover connection ({})".format(self.t))
580                    if self.reconnect == 100:
581                        printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t))
582                        self.mon.term()
583                    else:
584                        self.reconnect+=1
585                        printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
586                        time.sleep(self.reconnect*self.reconnect)
587
588                        try:
589                            self.init_db()
590                            printea("Recovered worker {} connection".format(self.t),'info')
591                        except:
592                            printea('Unable to initialize worker {}'.format(self.t))
593                            pass
594
595
596class Monitor():
597    def __init__(self):
598        self.MAX_QUEUE_UTILIZATION = 100
599        self.USE_MAX_QUEUES = True
600        self.MAX_SELECT_WINDOW = (2 ** 13) +1
601        self.MIN_SELECT_WINDOW = 32
602        self.MEM_USED=0
603        self.MAX_MEM=512
604        self.MIN_FREE_MEM_SERVER=100
605        self.USE_MAX_MEM=True
606        self.lock = Lock()
607        self.terminate = False
608        self.finished = False
609        self.paused = False
610        self.select_window = self.MIN_SELECT_WINDOW
611        self.commited = time.time()
612        self.procesed = 0
613        self.procesed_per_sec = [0]*10
614        self.load = 0
615        self.server_variables = None  # initialized by main worker
616        self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
617        self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
618        self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
619        self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
620        self.default_slow_check_clients = 1
621        self.slow_check_clients = self.default_slow_check_clients
622        self.server_mem = 0
623        self.loadlist = [ 0.0 ] * 100
624        self.max_heap = None
625
626        signal.signal(signal.SIGQUIT,self.term)
627        signal.signal(signal.SIGTERM,self.term)
628        signal.signal(signal.SIGINT,self.term)
629
630        self.db = {}
631        self.threads = {}
632        for x in THREADS:
633            self.db[x] = DB(self,x)
634            #try:
635            #    self.db[x].init_db()
636            #except Exception as e:
637            #    printea('Error initializing database connections: {}'.format(str(e)))
638            #    sys.exit(1)
639        self.cfg= None
640        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
641
642    def windowctl(self, *args, **kwargs):
643        if args[0] == '+':
644            if self.select_window*2 < self.MAX_SELECT_WINDOW:
645                self.select_window*=2
646                self.select_window=int(self.select_window)
647        if args[0] == '-':
648            if self.select_window > self.MIN_SELECT_WINDOW:
649                self.select_window/=2
650                self.select_window=int(self.select_window)
651
652    def slowcheckctl(self, *args, **kwargs):
653        if args[0] == 'reset':
654            self.default_slow_check_clients = 0
655        if args[0] == '+':
656            if self.default_slow_check_clients < MAX_SLOW_CHECK_CLIENTS:
657                if self.default_slow_check_clients == 0:
658                    self.default_slow_check_clients = 1
659                else:
660                    self.default_slow_check_clients = self.default_slow_check_clients * 2
661
662
663    def schedule(self, *args, **kwargs):
664        if kwargs['event'] == 'NO_MORE_CLIENTS':
665            self.windowctl('-')
666            self.slowcheckctl('+')
667
668        if kwargs['event'] == 'HAVE_CLIENTS':
669
670            if kwargs['nclients'] == self.select_window:
671                self.windowctl('+')
672            else:
673                self.windowctl('-')
674            self.slowcheckctl('reset')
675#
676#                if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
677#                    self.windowctl('+')
678#
679#                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
680#                    self.windowctl('-')
681#                    self.slowcheckctl('+')
682#                else:
683#                    self.windowctl('+')
684#
685#            elif kwargs['nclients'] < self.select_window/2:
686#                self.windowctl('-')
687#
688#            self.slowcheckctl('reset')
689
690        if kwargs['event']=='CHECK_SANITY':
691            # CPU SETTINGS
692            if not self.terminate and self.load > LIMIT:
693                self.paused = True
694            else:
695                self.paused = False
696
697            # DB MEM SETTINGS
698            if self.max_heap and self.temporary_tables_size['sum']:
699                if self.temporary_tables_size['sum'] > self.max_heap * 0.4:
700                    self.windowctl('+')
701                    if self.paused:
702                        #printea('Hitting max temporary table size unpausing','critical')
703                        self.paused = False
704            # SERVER MEM
705            if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER:
706                #printea('Hitting max memory from server collecting and reducing window','critical')
707                self.windowctl('-')
708                self.slowcheckctl('+')
709                self.USE_MAX_QUEUES=True
710                for x in THREADS:
711                    self.db[x].need_clean=True
712                gc.collect()
713            else:
714#                self.USE_MAX_QUEUES=False
715                for x in THREADS:
716                    self.db[x].need_clean=False
717
718            if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
719                self.windowctl('-')
720                self.slowcheckctl('+')
721
722            # QUEUES
723            if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
724                self.windowctl('+')
725
726
727    def get_cpu_load(self):
728        self.loadlist.append(psutil.cpu_percent())
729        self.loadlist=self.loadlist[1:]
730        avg=0.0
731        for x in self.loadlist:
732            avg+=x
733        return round(avg/100.0,2)
734
735    def put_config(self,key='',value=''):
736        pass
737
738    def term(self,*args,**kwargs):
739        printea("Begin kill the program, wait please...",'info')
740        self.terminate=True
741
742    def prepare_threads(self):
743        global DAEMON
744
745        self.threads['load']=Process(target=self.get_load)
746        self.threads['load'].daemon = DAEMON
747        self.threads['load'].start()
748        for x in THREADS:
749            self.threads[x]=Process(target=self.db[x].worker)
750            self.threads[x].daemon = DAEMON
751            self.threads[x].start()
752
753    def get_mem_usage(self):
754        process = psutil.Process(os.getpid())
755        mem = process.memory_info()[0] / float(2 ** 20)
756        return mem
757
758    def get_server_free_mem(self):
759        mem = psutil.virtual_memory()
760        return mem.free / (2 ** 20)
761
762    def print_stats(self):
763        global CHECK_LOAD_TIME
764#        if DEBUG:
765        if True:
766            out="Processed: "
767            out2=''
768            total=0
769            for x in THREADS:
770                out2+='{}={} '.format(x,self.db[x].processed)
771                self.cfg.store('processed '+x,self.db[x].processed)
772                if THREADED:
773                    if x != 'main':
774                        total += self.db[x].processed
775                else:
776                    total += self.db[x].processed
777            out += "TOTAL={} {}".format(total,out2)
778            self.cfg.store('processed total',total)
779            proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME)
780            self.procesed_per_sec.append(proc_per_sec)
781            self.procesed_per_sec=self.procesed_per_sec[-10:]
782            f=lambda x,y:x+y
783            suma_proc=reduce(f,self.procesed_per_sec)
784            self.cfg.store('processing at',int(suma_proc/10))
785            self.procesed=total
786            total = 0
787            if THREADED:
788                out+="Queues: "
789                sizes=self.get_q_sizes()
790                out2=''
791                for x in sizes:
792                    self.cfg.store('queue '+x,sizes[x])
793                    out2+='{}={} '.format(x,sizes[x])
794                    total += sizes[x]
795                self.cfg.store('queue totals',total)
796                out+="TOTAL={} {}".format(total,out2)
797            self.cfg.store('select window',self.select_window)
798            self.cfg.store('mem used',self.MEM_USED)
799            self.cfg.store('load',self.load)
800            if (self.paused):
801                self.cfg.store('paused',1)
802            else:
803                self.cfg.store('paused',0)
804            self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients']))
805            self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages']))
806            self.cfg.store('db_clients_size',int(self.db_tables_size['clients']))
807            self.cfg.store('db_packages_size',int(self.db_tables_size['packages']))
808            if DEBUG:
809                if THREADED:
810                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
811                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
812                else:
813                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
814                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
815
816    def get_q_sizes(self):
817        sizes={}
818        for x in THREADS:
819            sizes[x]=self.db[x].q.qsize()
820        return sizes
821
822    def sum_q_sizes(self):
823        sizes=self.get_q_sizes()
824        f=lambda x,y: x+y
825        return reduce(f,[sizes[x] for x in sizes])
826
827    def all_queues_empty(self):
828        sizes=self.get_q_sizes()
829        for x in sizes:
830            if sizes[x] != 0:
831                return False
832        return True
833
834    def get_load(self):
835        # runs on separate thread
836        db = DB(self,'load')
837        db.init_db()
838        self.cfg = Config(db)
839        ctime=0
840        while not (self.terminate and self.finished): #and not self.paused
841            self.cfg.store('keepalive',int(time.time()))
842            time.sleep(1)
843            self.load=self.get_cpu_load()
844            ctime+=1
845            self.schedule(event='CHECK_SANITY')
846            if ctime >CHECK_LOAD_TIME:
847                self.cfg.write()
848                ctime=0.0
849
850                self.MEM_USED=self.get_mem_usage()
851                self.server_mem=self.get_server_free_mem()
852
853                if self.server_variables and 'max_heap_table_size' in self.server_variables:
854                    self.max_heap=int(self.server_variables['max_heap_table_size'])/(2**20)
855
856                self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size('tmp_clients')
857                self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size('tmp_packages')
858                self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size('Client_Versions')
859                self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size('RecvPackages')
860                self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages']
861                self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages']
862                self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages']
863                self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages']
864                qempty = self.all_queues_empty()
865                if self.temporary_tables_rows['clients'] == 0 and self.temporary_tables_rows['packages'] == 0 and qempty:
866                    db.reset_autoinc()
867                self.print_stats()
868                #self.load=os.getloadavg()[0]
869                if qempty:
870                    gc.collect()
871            #end if
872        #end while
873        db.close_db()
874
875    def start(self):
876        self.prepare_threads()
877
878    def end(self):
879        for x in self.db:
880            self.threads[x].join()
881            self.db[x].close_db()
882        self.finished = True
883        self.print_stats()
884
885
886class Config():
887    def __init__(self,connection):
888        self._db = connection
889        self.read()
890
891    def store(self,var,value):
892        var=var.replace(' ','_')
893        if isinstance(value,str):
894            if value.isnumeric():
895                setattr(self,var,int(value))
896            else:
897                setattr(self,var,str(value))
898        else:
899            setattr(self,var,int(value))
900
901    def write(self):
902        if self._db:
903            values={}
904            for x in self.get_internal_vars():
905                values.setdefault(str(x),str(getattr(self,x)))
906            self._db.put_config(values)
907
908    def read(self):
909        if self._db:
910            config=self._db.get_config()
911            for key in config.keys():
912                if config[key].isnumeric():
913                    setattr(self,key,int(config[key]))
914                else:
915                    setattr(self,key,config[key])
916        else:
917            print('No config yet')
918
919    def get_internal_vars(self):
920        return list(filter(lambda x : x[0] != '_',self.__dict__.keys()))
921
922    def print(self):
923        for v in self.get_internal_vars():
924            print('{} = {}'.format(v,getattr(self,v)))
925
926
927def main(*args,**kwargs):
928    gc.enable()
929    if DAEMON:
930        fp = open('/var/run/analyticsd.pid','w');
931        fp.write(str(os.getpid()))
932        fp.close()
933    m = Monitor()
934    m.start()
935    printea("start done",'info')
936    while not m.terminate:
937        time.sleep(0.5)
938    m.end()
939    printea("Exitting...",'info')
940
941if __name__ == "__main__":
942    size=len([ p.name() for p in psutil.process_iter() if p.name() == 'analyticsd' ])
943    if size > 1:
944        printea("Another daemon running... exitting now!")
945        sys.exit(1)
946    lck = '/var/run/analyticsd.lock'
947    if DAEMON:
948        if os.path.isfile(lck):
949            printea('Lockfile {} detected, unable to start'.format(lck),'error')
950            sys.exit(1)
951        else:
952            try:
953                with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock(lck),files_preserve=[logger.handlers[0].socket.fileno()]):
954                    main()
955            except Exception as e:
956                printea(e)
957                sys.exit(1)
958    else:
959        main()
Note: See TracBrowser for help on using the repository browser.