source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 6774

Last change on this file since 6774 was 6774, checked in by mabarracus, 20 months ago

fix bug

  • Property svn:executable set to *
File size: 34.4 KB
Line 
1#!/usr/bin/python3
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21import logging
22import logging.handlers
23from logging import config
24from functools import reduce
25import traceback
26
27##### START EDITABLE VARS #####
28
29DEBUG=False
30MIN_LOG_LEVEL=logging.INFO
31DAEMON=True
32FILE='/usr/lib/analytics-server/analytics/config.php'
33LIMIT=70.0       # PERCENTAGE LOAD LIMIT
34
35##### END EDITABLE VARS #####
36
37DEBUG_PRINT_ALIVE_COUNTER=60
38THREADED=True
39
40if DEBUG:
41    if MIN_LOG_LEVEL:
42        loglevel=MIN_LOG_LEVEL
43    else:
44        loglevel=logging.DEBUG
45else:
46    loglevel=logging.INFO
47
48LOGGING = {
49    'version': 1,
50    'disable_existing_loggers': False,
51    'formatters': {
52        'verbose': {
53            'format': '%(levelname)s %(module)s %(message)s'
54            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
55            },
56        },
57    'handlers': {
58        'stdout': {
59            'class': 'logging.StreamHandler',
60            'stream': sys.stdout,
61            'formatter': 'verbose',
62            },
63        'sys-logger6': {
64            'class': 'logging.handlers.SysLogHandler',
65            'address': '/dev/log',
66            'facility': "local6",
67            'formatter': 'verbose',
68            },
69        },
70    'loggers': {
71        'analyticsd-logger': {
72            'handlers': ['sys-logger6','stdout'],
73            'level': loglevel,
74            'propagate': True,
75            },
76        }
77    }
78
79def to_str(x):
80    if isinstance(x,str):
81        return to_utf(x).decode('unicode_escape')
82    else:
83        return x
84
85def to_utf(x):
86    if isinstance(x,str):
87        return x.encode('utf-8')
88    else:
89        return x
90
91def printea(msg="",level='critical'):
92    if level == 'critical':
93        logger.critical(msg)
94    elif level == 'info':
95        logger.info(msg)
96    else:
97        logger.debug(msg)
98
99def keepalive(who=""):
100    global DEBUG_PRINT_ALIVE_COUNTER
101    t=str(int(time.time()))
102    if DEBUG:
103        printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
104    else:
105        if who == 'main':
106            if DEBUG_PRINT_ALIVE_COUNTER > 0:
107                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
108            else:
109                DEBUG_PRINT_ALIVE_COUNTER=60
110                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
111
112    fp = open('/var/run/analyticsd.keepalive','w');
113    fp.write(t)
114    fp.close()
115
116config.dictConfig(LOGGING)
117logger = logging.getLogger('analyticsd-logger')
118
119DBNAME=None
120USER=None
121PASS=None
122IP=None
123
124EMPTY_PAUSED_SLEEP=1
125CHECK_LOAD_TIME=5
126MAX_RETRIES=5
127TIMED_ON=False
128
129try:
130    with open(FILE,'r') as f:
131        for line in f:
132            if not IP:
133                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
134                if IP:
135                    IP = IP.group(1)
136            if not DBNAME:
137                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
138                if DBNAME:
139                    DBNAME = DBNAME.group(1)
140            if not USER:
141                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
142                if USER:
143                    USER = USER.group(1)
144            if not PASS:
145                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
146                if PASS:
147                    PASS = PASS.group(1)
148    if not (IP or DBNAME or USER or PASS):
149        printea("Couldn't get database configuration from {}".format(FILE))
150    else:
151        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
152except Exception as e:
153    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
154    sys.exit(1)
155
156if not (IP or DBNAME or USER or PASS):
157    printea("Couldn't get database configuration from {}".format(FILE))
158    sys.exit(1)
159
160if THREADED:
161    THREADS=['main','server','client','desktop','other']
162else:
163    THREADS=['main']
164
165class DB():
166    def __init__(self,mon,t='main'):
167        self.t=t
168        self.reconnect=0
169        self.empty = False
170        self.conn=None
171        self.mon=mon
172        self.q=queue.Queue()
173        self.processed=0
174        self.need_wait = False
175        self.need_clean = False
176        printea('Database worker {} initialized'.format(t),'info')
177
178    def timed(func):
179        @wraps(func)
180        def wrapper(*args,**kwargs):
181            if TIMED_ON:
182                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
183            ret=func(*args,**kwargs)
184            if TIMED_ON:
185                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
186            return ret
187        return wrapper
188
189    def with_retry(func):
190        @wraps(func)
191        def wrapper(*args,**kwargs):
192            if 'retry' not in kwargs:
193                kwargs['retry']=1
194            if 'mon' not in kwargs:
195                kwargs['mon'] = None
196            try:
197                return func(*args,**kwargs) 
198            except Exception as e:
199                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
200                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
201                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
202                    if kwargs['mon']:
203                        kwargs['mon'].term()
204                        sys.exit(1)
205                    return None
206                else:
207                    time.sleep(kwargs['retry']**2)
208                    kwargs['retry']+=1
209                    return wrapper(*args,**kwargs)
210            return result
211        return wrapper
212
213    def with_debug(func):
214        @wraps(func)
215        def wrapper(*args,**kwargs):
216            if 'query' in kwargs:
217                printea("executing query: {}".format(kwargs['query']),'debug')
218            return func(*args,**kwargs)
219        return wrapper
220
221    @with_debug
222    def execute(self,*args,**kwargs):
223        if 'query' not in kwargs:
224            printea("Warning execute called whithout query",'info')
225            return None
226        try:
227            return self.cur.execute(kwargs['query'])
228        except Exception as e:
229            raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
230
231    def init_db(self):
232        try:
233            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
234            self.conn.autocommit(False)
235            self.cur = self.conn.cursor()
236            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
237            variables=self.check_server_variables()
238            if variables:
239                self.mon.server_variables=variables
240            printea("Connected succesfully {}".format(self.t),'info')
241
242        except mdb.Error as e:
243            printea("Error {}: {}".format(e.args[0],e.args[1]))
244            raise Exception(e)
245
246    def check_server_variables(self):
247        if self.t != 'main':
248            return None
249        else:
250            printea('Getting server variables','info')
251            result = {}
252            self.cur.execute("show global variables")
253            printea('Setting {} vars'.format(self.cur.rowcount),'debug')
254            if self.cur.rowcount > 0:
255                for i in range(self.cur.rowcount):
256                    var_name, var_value = self.cur.fetchone()
257                    result.setdefault(var_name,var_value)
258            return result
259
260    def get_config(self):
261        values={}
262        self.cur.execute('select name,value from Config')
263        for val in self.cur.fetchall():
264            values.setdefault(val[0],val[1])
265        return values
266
267    def put_config(self,values):
268        vals=[]
269        for x in values.keys():
270            vals.append("('{}','{}')".format(x,values[x]))
271        try:
272            self.cur.execute("insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals)))
273            self.conn.commit()
274        except Exception as e:
275            print(e)
276
277    def check_temporary_tables_size(self,table_name):
278        if self.t != 'load':
279            return None
280        else:
281            size = 0
282            rows = 0
283            self.cur.execute('select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
284
285            res=self.cur.fetchone()
286            size = float(res[0])
287            rows = int(res[1])
288            return (int(size),rows)
289
290    def close_db(self):
291        if self.conn:
292            self.conn.close()
293            printea("Closed connection {}".format(self.t),'info')
294
295    def reduce_flavour(self,version,flavour):
296        if version == '15':
297            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
298                return 'desktop'
299            elif 'server' in flavour:
300                return 'server'
301            elif 'client' in flavour:
302                return 'client' 
303        elif version == '16':
304            if 'server' in flavour:
305                return 'server'
306            elif 'client' in flavour:
307                return 'client'
308            elif 'desktop' in flavour:
309                return 'desktop'
310        return 'other'
311
312    def reduce_version(self,version):
313        if version[0:2] in ['15','16']:
314            return version[0:2]
315        else:
316            return 'other'
317
318    def gen_uuid(self,*args,**kwargs):
319        try:
320            string=to_utf(u'-'.join([str(x) for x in args]))
321            h=hashlib.sha1(string)
322            return int(h.hexdigest()[0:16],16)
323        except Exception as e:
324            print(traceback.format_exc())
325
326    @timed
327    @with_retry
328    def get_client(self,*args,**kwargs):
329        try:
330            query="SELECT id,date,user,version,sabor from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window))
331            self.execute(query=query)
332            ret =[]
333            if self.cur.rowcount > 0:
334                for i in range(self.cur.rowcount):
335                    v_id,v_date,v_user,v_version,v_flavour=self.cur.fetchone()
336                    version=self.reduce_version(v_version)
337                    flavour=self.reduce_flavour(version,v_flavour)
338                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
339                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour})
340                return ret
341            else:
342                return True
343        except Exception as e:
344            raise Exception("Error getting client: {}".format(e))
345
346    @timed
347    @with_retry
348    def get_apps(self,*args,**kwargs):
349        if 'clients' not in kwargs:
350            printea("Warning executed without named parameter clients",'info')
351            return None
352        ret = {}
353        try:
354            cids_to_rel_fla={}
355            for c in kwargs['clients']:
356                if c['id'] not in cids_to_rel_fla:
357                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
358                if c['flavour'] not in ret:
359                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
360            for c in kwargs['clients']:
361                ret[c['flavour']]['clients'].append(c)
362
363            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
364            self.execute(query=query)
365            if self.cur.rowcount > 0:
366                for i in range(self.cur.rowcount):
367                    row = self.cur.fetchone()
368                    clid = row[0]
369                    rel,fla = cids_to_rel_fla[clid]
370                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
371                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
372            return ret
373        except Exception as e:
374            raise Exception("Error getting apps: {}".format(e))
375
376    @timed
377    @with_retry
378    def put_client(self,*args,**kwargs):
379        if 'client_list' not in kwargs:
380            raise Exception("Called without named parameter client_list")
381        values= []
382        for cli in kwargs['client_list']:
383            values.append("({},'{}','{}','{}','{}','{}','{}')".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour']))
384        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
385        self.execute(query=query)
386        return True
387
388    @timed
389    @with_retry
390    def put_apps(self,*args,**kwargs):
391        if 'apps' not in kwargs:
392            raise Exception("Called without named parameter apps")
393        app_list = {}
394        for app in kwargs['apps']:
395            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
396            if str(app['uuid']) not in app_list:
397                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
398            else:
399                app_list[str(app['uuid'])]['value']+=app['value']
400        values = []
401        for app in app_list:
402            item=app_list[app]
403            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
404        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
405        self.execute(query=query)
406        return True
407
408    @timed
409    @with_retry
410    def del_client(self,*args,**kwargs):
411        if 'client_list' not in kwargs:
412            raise Exception("Called without named parameter client_list")
413        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
414        self.execute(query=query)
415        return True
416
417    @timed
418    @with_retry
419    def del_apps(self,*args,**kwargs):
420        if 'client_list' not in kwargs:
421            raise Exception("Called without named parameter client_list")
422        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
423        self.execute(query=query)
424        return True
425
426    @timed
427    def reset_autoinc(self):
428        try:
429            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME)
430            self.execute(query=query)
431            ainc = self.cur.fetchone()[0]
432            query = "SELECT count(*) from tmp_clients"
433            self.execute(query=query)
434            cli_size=self.cur.fetchone()[0]
435            query = "SELECT count(*) from tmp_packages"
436            self.execute(query=query)
437            pkg_size=self.cur.fetchone()[0]
438            printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
439            if ainc > 65500 and self.mon.all_queues_empty() and cli_size == 0 and pkg_size == 0:
440                query = "TRUNCATE TABLE tmp_clients"
441                self.execute(query=query)
442                query = "TRUNCATE TABLE tmp_packages"
443                self.execute(query=query)
444            return True
445        except Exception as e:
446            raise Exception("Error reseting auto_increment: {}".format(e))
447
448
449    @timed
450    def process_main_thread(self,*args,**kwargs):
451        if self.need_wait:
452            time.sleep(EMPTY_PAUSED_SLEEP)
453        clis=self.get_client(mon=self.mon)
454        if clis == True: #No clients found (empty)
455            self.empty = True # if > 65500 auto_increment was reset
456            self.mon.schedule(event='NO_MORE_CLIENTS')
457            return False
458        #clients found
459        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
460        # if clients found get apps
461        lapps = self.get_apps(clients=clis,mon=self.mon)
462        #lapps can be empty
463        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
464        #If deletion was failed , thread was died
465        lapps_tmp={'apps':[],'clients':[]}
466        for fla in lapps:
467            if THREADED:
468                lapps[fla]['timestamp']=time.time()
469                self.mon.db[fla].q.put(lapps[fla],True)
470            else:
471                lapps_tmp['apps'].extend(lapps[fla]['apps'])
472                lapps_tmp['clients'].extend(lapps[fla]['clients'])
473                self.mon.db['main'].q.put(lapps_tmp,True)
474        #if DEBUG:
475        self.processed+=len(clis)
476        return True
477
478    @timed
479    def process_all_threads(self,*args,**kwargs):
480        lapps=self.q.get(True)
481        #print "Running {}".format(self.t)
482        if THREADED:
483            while (lapps['timestamp'] > self.mon.commited):
484                time.sleep(0.001)
485        if len(lapps['apps']) != 0 and len(lapps['clients']) != 0:
486            #IF FAIL, AFTER RETRIES THREAD DIES
487            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
488                self.q.put(lapps,True) # USELESS
489                return False    # USELESS
490            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
491                self.q.put(lapps,True) # USELESS
492                return False    # USELESS
493            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
494                self.q.put(lapps,True) # USELESS
495                return False    # USELESS
496        #if DEBUG:
497        self.processed+=len(lapps['clients'])
498        return True
499
500    def process(self,*args,**kwargs):
501        keepalive(self.t)
502        # code only for main thread
503        printea("Running thread {}".format(self.t),'debug')
504        if self.t == 'main' and not self.mon.terminate:
505           
506            ret=self.process_main_thread(*args,**kwargs)
507            if ret == False: #No more clients available
508                return True
509            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
510                if THREADED:
511                    return True #Need return to avoid put empty flag and wait into main thread
512
513        #after this poing, code for all threads
514        if not self.q.empty():
515            ret=self.process_all_threads(*args,**kwargs)
516            if ret == False: # USELESS? THREADS was died
517                printea("Error threads")
518                return ret
519        else: 
520            del self.q
521            self.q = queue.Queue()
522            self.empty = True
523        return True
524
525    def worker(self):
526        printea("Starting worker {} processing".format(self.t),'info')
527        while not (self.mon.terminate and self.empty):
528            if self.mon.paused or self.empty:
529                if self.empty and self.t == 'main':
530                    self.reset_autoinc()
531                if self.mon.paused:
532                    printea("Paused by high load {}".format(self.t),'debug')
533                if self.empty:
534                    printea("Empty queue {} sleeping by now".format(self.t),'debug')
535                if self.empty:
536                    self.empty = False
537                time.sleep(EMPTY_PAUSED_SLEEP)
538            else:
539                try:
540                    if self.conn == None:
541                        raise Exception('Connection not available, probably it\'s a first run')
542                    else:
543                        self.conn.begin()
544                    if self.process():
545                        self.conn.commit()
546                        self.mon.commited=time.time()
547                        self.reconnect = 0
548                        if self.need_clean:
549                            gc.collect()
550                    else:
551                        self.conn.rollback()
552                except Exception as e:
553                    try:
554                        if self.conn != None:
555                            self.conn.rollback()
556                    except:
557                        printea("Can't rollback last actions",'info')
558                        pass
559                    #if e[0] != 2006:
560                    #    printea("Exception processing worker({}): {}".format(self.t,e))
561                    #if e[0] == 2006: # SERVER GONE AWAY
562                    #    printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))
563                    printea("Trying to recover connection ({})".format(self.t))
564                    if self.reconnect == 100:
565                        printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t))
566                        self.mon.term()
567                    else:
568                        self.reconnect+=1
569                        printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
570                        time.sleep(self.reconnect*self.reconnect)
571                        try:
572                            self.init_db()
573                            printea("Recovered worker {} connection".format(self.t),'info')
574                        except:
575                            printea('Unable to initialize worker {}'.format(self.t))
576                            pass
577
578
579class Monitor():
580    def __init__(self):
581        self.MAX_QUEUE_UTILIZATION = 100
582        self.USE_MAX_QUEUES = False
583        self.MAX_SELECT_WINDOW = (2 ** 13) +1
584        self.MIN_SELECT_WINDOW = 32
585        self.MEM_USED=0
586        self.MAX_MEM=512
587        self.MIN_FREE_MEM_SERVER=100
588        self.USE_MAX_MEM=True
589        self.lock = Lock()
590        self.terminate = False
591        self.finished = False
592        self.paused = False
593        self.select_window = self.MIN_SELECT_WINDOW
594        self.commited = time.time()
595        self.procesed = 0
596        self.procesed_per_sec = [0]*10
597        self.load = False
598        self.server_variables = None  # initialized by main worker
599        self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
600        self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
601        self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
602        self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
603
604        self.server_mem = None
605        self.loadlist = [ 0.0 ] * 100
606        signal.signal(signal.SIGQUIT,self.term)
607        signal.signal(signal.SIGTERM,self.term)
608        signal.signal(signal.SIGINT,self.term)
609
610        self.db = {}
611        self.threads = {}
612        for x in THREADS:
613            self.db[x] = DB(self,x)
614            #try:
615            #    self.db[x].init_db()
616            #except Exception as e:
617            #    printea('Error initializing database connections: {}'.format(str(e)))
618            #    sys.exit(1)
619        self.cfg= None
620        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
621
622    def schedule(self,*args,**kwargs):
623        if kwargs['event']=='NO_MORE_CLIENTS':
624            if self.select_window > self.MIN_SELECT_WINDOW:
625                self.select_window/=2
626                self.select_window=int(self.select_window)
627        if kwargs['event']=='HAVE_CLIENTS':
628            if kwargs['nclients'] == self.select_window:
629                if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
630                    if self.select_window > self.MIN_SELECT_WINDOW:
631                        self.select_window/=2
632                        self.select_window=int(self.select_window)
633                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
634                    if self.select_window > self.MIN_SELECT_WINDOW:
635                        self.select_window/=2
636                        self.select_window=int(self.select_window)
637                else:
638                    if self.select_window*2 < self.MAX_SELECT_WINDOW:
639                        self.select_window*=2
640                        self.select_window=int(self.select_window)
641            elif kwargs['nclients']<self.select_window/2:
642                self.select_window/=2
643                self.select_window=int(self.select_window)
644
645        if kwargs['event']=='CHECK_SANITY':
646            #nprocs=multiprocessing.cpu_count()
647            #limit=nprocs*LIMIT
648
649            self.load=self.get_cpu_load()
650            if not self.terminate and self.load > LIMIT:
651                self.paused = True
652            else:
653                self.paused = False
654
655            self.MEM_USED=self.get_mem_usage()
656            self.server_mem=self.get_server_free_mem()
657
658            max_heap=None
659            if self.server_variables and 'max_heap_table_size' in self.server_variables:
660                max_heap=int(self.server_variables['max_heap_table_size'])/(2**20)
661            self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages']
662            self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages']
663            self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages']
664            self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages']
665            if max_heap and self.temporary_tables_size['sum']:
666                if self.temporary_tables_size['sum'] > max_heap * 0.4:
667                    if self.select_window*2 < self.MAX_SELECT_WINDOW:
668                        self.select_window *= 2
669                        self.select_window=int(self.select_window)
670                    if self.paused:
671                        #printea('Hitting max temporary table size unpausing','critical')
672                        self.paused = False
673            if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER:
674                #printea('Hitting max memory from server collecting and reducing window','critical')
675                if self.select_window > self.MIN_SELECT_WINDOW:
676                    self.select_window/=2
677                    self.select_window=int(self.select_window)
678                self.db['main'].need_wait=True
679                self.USE_MAX_QUEUES=True
680                for x in THREADS:
681                    self.db[x].need_clean=True
682                gc.collect()
683            else:
684                self.db['main'].need_wait=False
685                self.USE_MAX_QUEUES=False
686                for x in THREADS:
687                    self.db[x].need_clean=True
688
689    def get_config(self):
690        pass
691
692    def get_cpu_load(self):
693        self.loadlist.append(psutil.cpu_percent())
694        self.loadlist=self.loadlist[1:]
695        avg=0.0
696        for x in self.loadlist:
697            avg+=x
698        return round(avg/100.0,2)
699
700    def put_config(self,key='',value=''):
701        pass
702
703    def term(self,*args,**kwargs):
704        printea("Begin kill the program, wait please...",'info')
705        self.terminate=True
706
707    def prepare_threads(self):
708        global DAEMON
709
710        self.threads['load']=Process(target=self.get_load)
711        self.threads['load'].daemon = DAEMON
712        self.threads['load'].start()
713        for x in THREADS:
714            self.threads[x]=Process(target=self.db[x].worker)
715            self.threads[x].daemon = DAEMON
716            self.threads[x].start()
717
718    def get_mem_usage(self):
719        process = psutil.Process(os.getpid())
720        mem = process.memory_info()[0] / float(2 ** 20)
721        return mem
722
723    def get_server_free_mem(self):
724        mem = psutil.virtual_memory()
725        return mem.free / (2 ** 20)
726
727    def print_stats(self):
728        global CHECK_LOAD_TIME
729#        if DEBUG:
730        if True:
731            out="Processed: "
732            out2=''
733            total=0
734            for x in THREADS:
735                out2+='{}={} '.format(x,self.db[x].processed)
736                self.cfg.store('processed '+x,self.db[x].processed)
737                if THREADED:
738                    if x != 'main':
739                        total += self.db[x].processed
740                else:
741                    total += self.db[x].processed
742            out += "TOTAL={} {}".format(total,out2)
743            self.cfg.store('processed total',total)
744            proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME)
745            self.procesed_per_sec.append(proc_per_sec)
746            self.procesed_per_sec=self.procesed_per_sec[-10:]
747            f=lambda x,y:x+y
748            suma_proc=reduce(f,self.procesed_per_sec)
749            self.cfg.store('processing at',int(suma_proc/10))
750            self.procesed=total
751            total = 0
752            if THREADED:
753                out+="Queues: "
754                sizes=self.get_q_sizes()
755                out2=''
756                for x in sizes:
757                    self.cfg.store('queue '+x,sizes[x])
758                    out2+='{}={} '.format(x,sizes[x])
759                    total += sizes[x]
760                self.cfg.store('queue totals',total)
761                out+="TOTAL={} {}".format(total,out2)
762            self.cfg.store('select window',self.select_window)
763            self.cfg.store('mem used',self.MEM_USED)
764            self.cfg.store('load',self.load)
765            if (self.paused):
766                self.cfg.store('paused',1)
767            else:
768                self.cfg.store('paused',0)
769            self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients']))
770            self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages']))
771            self.cfg.store('db_clients_size',int(self.db_tables_size['clients']))
772            self.cfg.store('db_packages_size',int(self.db_tables_size['packages']))
773            if DEBUG:
774                if THREADED:
775                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
776                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
777                else:
778                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
779                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
780
781    def get_q_sizes(self):
782        sizes={}
783        for x in THREADS:
784            sizes[x]=self.db[x].q.qsize()
785        return sizes
786
787    def sum_q_sizes(self):
788        sizes=self.get_q_sizes()
789        f=lambda x,y: x+y
790        return reduce(f,[sizes[x] for x in sizes])
791
792    def all_queues_empty(self):
793        sizes=self.get_q_sizes()
794        for x in sizes:
795            if sizes[x] != 0:
796                return False
797        return True
798
799    def get_load(self):
800        # runs on separate thread
801        db = DB(self,'load')
802        db.init_db()
803        self.cfg = Config(db)
804        ctime=0
805        while not (self.terminate and self.finished): #and not self.paused
806            self.cfg.store('keepalive',int(time.time()))
807            time.sleep(0.5)
808            ctime+=0.5
809            self.schedule(event='CHECK_SANITY')
810
811            if ctime >CHECK_LOAD_TIME:
812                self.cfg.write()
813                ctime=0.0
814                self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size('tmp_clients')
815                self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size('tmp_packages')
816                self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size('Client_Versions')
817                self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size('RecvPackages')
818                self.print_stats()
819                #self.load=os.getloadavg()[0]
820                if self.all_queues_empty():
821                    gc.collect()
822            #end if
823        #end while
824        db.close_db()
825
826    def start(self):
827        self.prepare_threads()
828
829    def end(self):
830        for x in self.db:
831            self.threads[x].join()
832            self.db[x].close_db()
833        self.finished = True
834        self.print_stats()
835
836
837class Config():
838    def __init__(self,connection):
839        self._db = connection
840        self.read()
841
842    def store(self,var,value):
843        var=var.replace(' ','_')
844        if isinstance(value,str):
845            if value.isnumeric():
846                setattr(self,var,int(value))
847            else:
848                setattr(self,var,str(value))
849        else:
850            setattr(self,var,int(value))
851
852    def write(self):
853        if self._db:
854            values={}
855            for x in self.get_internal_vars():
856                values.setdefault(str(x),str(getattr(self,x)))
857            self._db.put_config(values)
858
859    def read(self):
860        if self._db:
861            config=self._db.get_config()
862            for key in config.keys():
863                if config[key].isnumeric():
864                    setattr(self,key,int(config[key]))
865                else:
866                    setattr(self,key,config[key])
867        else:
868            print('No config yet')
869
870    def get_internal_vars(self):
871        return list(filter(lambda x : x[0] != '_',self.__dict__.keys()))
872
873    def print(self):
874        for v in self.get_internal_vars():
875            print('{} = {}'.format(v,getattr(self,v)))
876
877
878def main(*args,**kwargs):
879    gc.enable()
880    if DAEMON:
881        fp = open('/var/run/analyticsd.pid','w');
882        fp.write(str(os.getpid()))
883        fp.close()
884    m = Monitor()
885    m.start()
886    printea("start done",'info')
887    while not m.terminate:
888        time.sleep(0.5)
889    m.end()
890    printea("Exitting...",'info')
891
892if __name__ == "__main__":
893    size=len([ p.name() for p in psutil.process_iter() if p.name() == 'analyticsd' ])
894    if size > 1:
895        printea("Another daemon running... exitting now!")
896        sys.exit(1)
897    if DAEMON:
898        with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock('/var/run/analyticsd'),files_preserve=[logger.handlers[0].socket.fileno()]):
899            main()
900    else:
901        main()
Note: See TracBrowser for help on using the repository browser.