source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 6775

Last change on this file since 6775 was 6775, checked in by mabarracus, 2 years ago

fix bug

  • Property svn:executable set to *
File size: 34.7 KB
Line 
1#!/usr/bin/python3
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21import logging
22import logging.handlers
23from logging import config
24from functools import reduce
25import traceback
26
27##### START EDITABLE VARS #####
28
29DEBUG=False
30MIN_LOG_LEVEL=logging.INFO
31DAEMON=True
32FILE='/usr/lib/analytics-server/analytics/config.php'
33LIMIT=70.0       # PERCENTAGE LOAD LIMIT
34
35##### END EDITABLE VARS #####
36
37DEBUG_PRINT_ALIVE_COUNTER=60
38THREADED=True
39
40if DEBUG:
41    if MIN_LOG_LEVEL:
42        loglevel=MIN_LOG_LEVEL
43    else:
44        loglevel=logging.DEBUG
45else:
46    loglevel=logging.INFO
47
48LOGGING = {
49    'version': 1,
50    'disable_existing_loggers': False,
51    'formatters': {
52        'verbose': {
53            'format': '%(levelname)s %(module)s %(message)s'
54            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
55            },
56        },
57    'handlers': {
58        'stdout': {
59            'class': 'logging.StreamHandler',
60            'stream': sys.stdout,
61            'formatter': 'verbose',
62            },
63        'sys-logger6': {
64            'class': 'logging.handlers.SysLogHandler',
65            'address': '/dev/log',
66            'facility': "local6",
67            'formatter': 'verbose',
68            },
69        },
70    'loggers': {
71        'analyticsd-logger': {
72            'handlers': ['sys-logger6','stdout'],
73            'level': loglevel,
74            'propagate': True,
75            },
76        }
77    }
78
79def to_str(x):
80    if isinstance(x,str):
81        return to_utf(x).decode('unicode_escape')
82    else:
83        return x
84
85def to_utf(x):
86    if isinstance(x,str):
87        return x.encode('utf-8')
88    else:
89        return x
90
91def printea(msg="",level='critical'):
92    if level == 'critical':
93        logger.critical(msg)
94    elif level == 'info':
95        logger.info(msg)
96    else:
97        logger.debug(msg)
98
99def keepalive(who=""):
100    global DEBUG_PRINT_ALIVE_COUNTER
101    t=str(int(time.time()))
102    if DEBUG:
103        printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
104    else:
105        if who == 'main':
106            if DEBUG_PRINT_ALIVE_COUNTER > 0:
107                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
108            else:
109                DEBUG_PRINT_ALIVE_COUNTER=60
110                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
111
112    fp = open('/var/run/analyticsd.keepalive','w');
113    fp.write(t)
114    fp.close()
115
116config.dictConfig(LOGGING)
117logger = logging.getLogger('analyticsd-logger')
118
119DBNAME=None
120USER=None
121PASS=None
122IP=None
123
124EMPTY_PAUSED_SLEEP=1
125CHECK_LOAD_TIME=5
126MAX_RETRIES=5
127TIMED_ON=False
128
129try:
130    with open(FILE,'r') as f:
131        for line in f:
132            if not IP:
133                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
134                if IP:
135                    IP = IP.group(1)
136            if not DBNAME:
137                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
138                if DBNAME:
139                    DBNAME = DBNAME.group(1)
140            if not USER:
141                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
142                if USER:
143                    USER = USER.group(1)
144            if not PASS:
145                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
146                if PASS:
147                    PASS = PASS.group(1)
148    if not (IP or DBNAME or USER or PASS):
149        printea("Couldn't get database configuration from {}".format(FILE))
150    else:
151        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
152except Exception as e:
153    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
154    sys.exit(1)
155
156if not (IP or DBNAME or USER or PASS):
157    printea("Couldn't get database configuration from {}".format(FILE))
158    sys.exit(1)
159
160if THREADED:
161    THREADS=['main','server','client','desktop','other']
162else:
163    THREADS=['main']
164
165class DB():
166    def __init__(self,mon,t='main'):
167        self.t=t
168        self.reconnect=0
169        self.empty = False
170        self.conn=None
171        self.mon=mon
172        self.q=queue.Queue()
173        self.processed=0
174        self.need_wait = False
175        self.need_clean = False
176        printea('Database worker {} initialized'.format(t),'info')
177
178    def timed(func):
179        @wraps(func)
180        def wrapper(*args,**kwargs):
181            if TIMED_ON:
182                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
183            ret=func(*args,**kwargs)
184            if TIMED_ON:
185                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
186            return ret
187        return wrapper
188
189    def with_retry(func):
190        @wraps(func)
191        def wrapper(*args,**kwargs):
192            if 'retry' not in kwargs:
193                kwargs['retry']=1
194            if 'mon' not in kwargs:
195                kwargs['mon'] = None
196            try:
197                return func(*args,**kwargs) 
198            except Exception as e:
199                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
200                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
201                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
202                    if kwargs['mon']:
203                        kwargs['mon'].term()
204                        sys.exit(1)
205                    return None
206                else:
207                    time.sleep(kwargs['retry']**2)
208                    kwargs['retry']+=1
209                    return wrapper(*args,**kwargs)
210            return result
211        return wrapper
212
213    def with_debug(func):
214        @wraps(func)
215        def wrapper(*args,**kwargs):
216            if 'query' in kwargs:
217                printea("executing query: {}".format(kwargs['query']),'debug')
218            return func(*args,**kwargs)
219        return wrapper
220
221    @with_debug
222    def execute(self,*args,**kwargs):
223        if 'query' not in kwargs:
224            printea("Warning execute called whithout query",'info')
225            return None
226        try:
227            return self.cur.execute(kwargs['query'])
228        except Exception as e:
229            raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
230
231    def init_db(self):
232        try:
233            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
234            self.conn.autocommit(False)
235            self.cur = self.conn.cursor()
236            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
237            variables=self.check_server_variables()
238            if variables:
239                self.mon.server_variables=variables
240            printea("Connected succesfully {}".format(self.t),'info')
241
242        except mdb.Error as e:
243            printea("Error {}: {}".format(e.args[0],e.args[1]))
244            raise Exception(e)
245
246    def check_server_variables(self):
247        if self.t != 'main':
248            return None
249        else:
250            printea('Getting server variables','info')
251            result = {}
252            self.cur.execute("show global variables")
253            printea('Setting {} vars'.format(self.cur.rowcount),'debug')
254            if self.cur.rowcount > 0:
255                for i in range(self.cur.rowcount):
256                    var_name, var_value = self.cur.fetchone()
257                    result.setdefault(var_name,var_value)
258            return result
259
260    def get_config(self):
261        values={}
262        self.cur.execute('select name,value from Config')
263        for val in self.cur.fetchall():
264            values.setdefault(val[0],val[1])
265        return values
266
267    def put_config(self,values):
268        vals=[]
269        for x in values.keys():
270            vals.append("('{}','{}')".format(x,values[x]))
271        try:
272            self.cur.execute("insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals)))
273            self.conn.commit()
274        except Exception as e:
275            print(e)
276
277    def check_temporary_tables_size(self,table_name,is_tmp=False):
278        if self.t != 'load':
279            return None
280        else:
281            size = 0
282            rows = 0
283            if is_tmp:
284                self.cur.execute('select floor((data_length + index_length - data_free) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
285            else:
286                self.cur.execute('select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
287
288            res=self.cur.fetchone()
289            size = float(res[0])
290            rows = int(res[1])
291            return (int(size),rows)
292
293    def close_db(self):
294        if self.conn:
295            self.conn.close()
296            printea("Closed connection {}".format(self.t),'info')
297
298    def reduce_flavour(self,version,flavour):
299        if version == '15':
300            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
301                return 'desktop'
302            elif 'server' in flavour:
303                return 'server'
304            elif 'client' in flavour:
305                return 'client' 
306        elif version == '16':
307            if 'server' in flavour:
308                return 'server'
309            elif 'client' in flavour:
310                return 'client'
311            elif 'desktop' in flavour:
312                return 'desktop'
313        return 'other'
314
315    def reduce_version(self,version):
316        if version[0:2] in ['15','16']:
317            return version[0:2]
318        else:
319            return 'other'
320
321    def gen_uuid(self,*args,**kwargs):
322        try:
323            string=to_utf(u'-'.join([str(x) for x in args]))
324            h=hashlib.sha1(string)
325            return int(h.hexdigest()[0:16],16)
326        except Exception as e:
327            print(traceback.format_exc())
328
329    @timed
330    @with_retry
331    def get_client(self,*args,**kwargs):
332        try:
333            query="SELECT id,date,user,version,sabor from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window))
334            self.execute(query=query)
335            ret =[]
336            if self.cur.rowcount > 0:
337                for i in range(self.cur.rowcount):
338                    v_id,v_date,v_user,v_version,v_flavour=self.cur.fetchone()
339                    version=self.reduce_version(v_version)
340                    flavour=self.reduce_flavour(version,v_flavour)
341                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
342                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour})
343                return ret
344            else:
345                return True
346        except Exception as e:
347            raise Exception("Error getting client: {}".format(e))
348
349    @timed
350    @with_retry
351    def get_apps(self,*args,**kwargs):
352        if 'clients' not in kwargs:
353            printea("Warning executed without named parameter clients",'info')
354            return None
355        ret = {}
356        try:
357            cids_to_rel_fla={}
358            for c in kwargs['clients']:
359                if c['id'] not in cids_to_rel_fla:
360                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
361                if c['flavour'] not in ret:
362                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
363            for c in kwargs['clients']:
364                ret[c['flavour']]['clients'].append(c)
365
366            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
367            self.execute(query=query)
368            if self.cur.rowcount > 0:
369                for i in range(self.cur.rowcount):
370                    row = self.cur.fetchone()
371                    clid = row[0]
372                    rel,fla = cids_to_rel_fla[clid]
373                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
374                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
375            return ret
376        except Exception as e:
377            raise Exception("Error getting apps: {}".format(e))
378
379    @timed
380    @with_retry
381    def put_client(self,*args,**kwargs):
382        if 'client_list' not in kwargs:
383            raise Exception("Called without named parameter client_list")
384        values= []
385        for cli in kwargs['client_list']:
386            values.append("({},'{}','{}','{}','{}','{}','{}')".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour']))
387        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
388        self.execute(query=query)
389        return True
390
391    @timed
392    @with_retry
393    def put_apps(self,*args,**kwargs):
394        if 'apps' not in kwargs:
395            raise Exception("Called without named parameter apps")
396        app_list = {}
397        for app in kwargs['apps']:
398            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
399            if str(app['uuid']) not in app_list:
400                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
401            else:
402                app_list[str(app['uuid'])]['value']+=app['value']
403        values = []
404        for app in app_list:
405            item=app_list[app]
406            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
407        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
408        self.execute(query=query)
409        return True
410
411    @timed
412    @with_retry
413    def del_client(self,*args,**kwargs):
414        if 'client_list' not in kwargs:
415            raise Exception("Called without named parameter client_list")
416        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
417        self.execute(query=query)
418        return True
419
420    @timed
421    @with_retry
422    def del_apps(self,*args,**kwargs):
423        if 'client_list' not in kwargs:
424            raise Exception("Called without named parameter client_list")
425        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
426        self.execute(query=query)
427        return True
428
429    @timed
430    def reset_autoinc(self):
431        try:
432            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME)
433            self.execute(query=query)
434            ainc = self.cur.fetchone()[0]
435            query = "SELECT count(*) from tmp_clients"
436            self.execute(query=query)
437            cli_size=self.cur.fetchone()[0]
438            query = "SELECT count(*) from tmp_packages"
439            self.execute(query=query)
440            pkg_size=self.cur.fetchone()[0]
441            printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
442            if ainc > 65500 and self.mon.all_queues_empty() and cli_size == 0 and pkg_size == 0:
443                query = "TRUNCATE TABLE tmp_clients"
444                self.execute(query=query)
445                query = "TRUNCATE TABLE tmp_packages"
446                self.execute(query=query)
447            return True
448        except Exception as e:
449            raise Exception("Error reseting auto_increment: {}".format(e))
450
451
452    @timed
453    def process_main_thread(self,*args,**kwargs):
454        if self.need_wait:
455            time.sleep(EMPTY_PAUSED_SLEEP)
456        clis=self.get_client(mon=self.mon)
457        if clis == True: #No clients found (empty)
458            self.empty = True # if > 65500 auto_increment was reset
459            self.mon.schedule(event='NO_MORE_CLIENTS')
460            return False
461        #clients found
462        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
463        # if clients found get apps
464        lapps = self.get_apps(clients=clis,mon=self.mon)
465        #lapps can be empty
466        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
467        #If deletion was failed , thread was died
468        lapps_tmp={'apps':[],'clients':[]}
469        for fla in lapps:
470            if THREADED:
471                lapps[fla]['timestamp']=time.time()
472                self.mon.db[fla].q.put(lapps[fla],True)
473            else:
474                lapps_tmp['apps'].extend(lapps[fla]['apps'])
475                lapps_tmp['clients'].extend(lapps[fla]['clients'])
476                self.mon.db['main'].q.put(lapps_tmp,True)
477        #if DEBUG:
478        self.processed+=len(clis)
479        return True
480
481    @timed
482    def process_all_threads(self,*args,**kwargs):
483        lapps=self.q.get(True)
484        #print "Running {}".format(self.t)
485        if THREADED:
486            while (lapps['timestamp'] > self.mon.commited):
487                time.sleep(0.001)
488        if len(lapps['apps']) != 0 and len(lapps['clients']) != 0:
489            #IF FAIL, AFTER RETRIES THREAD DIES
490            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
491                self.q.put(lapps,True) # USELESS
492                return False    # USELESS
493            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
494                self.q.put(lapps,True) # USELESS
495                return False    # USELESS
496            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
497                self.q.put(lapps,True) # USELESS
498                return False    # USELESS
499        #if DEBUG:
500        self.processed+=len(lapps['clients'])
501        return True
502
503    def process(self,*args,**kwargs):
504        keepalive(self.t)
505        # code only for main thread
506        printea("Running thread {}".format(self.t),'debug')
507        if self.t == 'main' and not self.mon.terminate:
508           
509            ret=self.process_main_thread(*args,**kwargs)
510            if ret == False: #No more clients available
511                return True
512            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
513                if THREADED:
514                    return True #Need return to avoid put empty flag and wait into main thread
515
516        #after this poing, code for all threads
517        if not self.q.empty():
518            ret=self.process_all_threads(*args,**kwargs)
519            if ret == False: # USELESS? THREADS was died
520                printea("Error threads")
521                return ret
522        else: 
523            del self.q
524            self.q = queue.Queue()
525            self.empty = True
526        return True
527
528    def worker(self):
529        printea("Starting worker {} processing".format(self.t),'info')
530        while not (self.mon.terminate and self.empty):
531            if self.mon.paused or self.empty:
532                if self.empty and self.t == 'main':
533                    self.reset_autoinc()
534                if self.mon.paused:
535                    printea("Paused by high load {}".format(self.t),'debug')
536                if self.empty:
537                    printea("Empty queue {} sleeping by now".format(self.t),'debug')
538                if self.empty:
539                    self.empty = False
540                time.sleep(EMPTY_PAUSED_SLEEP)
541            else:
542                try:
543                    if self.conn == None:
544                        raise Exception('Connection not available, probably it\'s a first run')
545                    else:
546                        self.conn.begin()
547                    if self.process():
548                        self.conn.commit()
549                        self.mon.commited=time.time()
550                        self.reconnect = 0
551                        if self.need_clean:
552                            gc.collect()
553                    else:
554                        self.conn.rollback()
555                except Exception as e:
556                    try:
557                        if self.conn != None:
558                            self.conn.rollback()
559                    except:
560                        printea("Can't rollback last actions",'info')
561                        pass
562                    #if e[0] != 2006:
563                    #    printea("Exception processing worker({}): {}".format(self.t,e))
564                    #if e[0] == 2006: # SERVER GONE AWAY
565                    #    printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))
566                    printea("Trying to recover connection ({})".format(self.t))
567                    if self.reconnect == 100:
568                        printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t))
569                        self.mon.term()
570                    else:
571                        self.reconnect+=1
572                        printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
573                        time.sleep(self.reconnect*self.reconnect)
574                        try:
575                            self.init_db()
576                            printea("Recovered worker {} connection".format(self.t),'info')
577                        except:
578                            printea('Unable to initialize worker {}'.format(self.t))
579                            pass
580
581
582class Monitor():
583    def __init__(self):
584        self.MAX_QUEUE_UTILIZATION = 100
585        self.USE_MAX_QUEUES = False
586        self.MAX_SELECT_WINDOW = (2 ** 13) +1
587        self.MIN_SELECT_WINDOW = 32
588        self.MEM_USED=0
589        self.MAX_MEM=512
590        self.MIN_FREE_MEM_SERVER=100
591        self.USE_MAX_MEM=True
592        self.lock = Lock()
593        self.terminate = False
594        self.finished = False
595        self.paused = False
596        self.select_window = self.MIN_SELECT_WINDOW
597        self.commited = time.time()
598        self.procesed = 0
599        self.procesed_per_sec = [0]*10
600        self.load = False
601        self.server_variables = None  # initialized by main worker
602        self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
603        self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
604        self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
605        self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
606
607        self.server_mem = None
608        self.loadlist = [ 0.0 ] * 100
609        signal.signal(signal.SIGQUIT,self.term)
610        signal.signal(signal.SIGTERM,self.term)
611        signal.signal(signal.SIGINT,self.term)
612
613        self.db = {}
614        self.threads = {}
615        for x in THREADS:
616            self.db[x] = DB(self,x)
617            #try:
618            #    self.db[x].init_db()
619            #except Exception as e:
620            #    printea('Error initializing database connections: {}'.format(str(e)))
621            #    sys.exit(1)
622        self.cfg= None
623        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
624
625    def schedule(self,*args,**kwargs):
626        if kwargs['event']=='NO_MORE_CLIENTS':
627            if self.select_window > self.MIN_SELECT_WINDOW:
628                self.select_window/=2
629                self.select_window=int(self.select_window)
630        if kwargs['event']=='HAVE_CLIENTS':
631            if kwargs['nclients'] == self.select_window:
632                if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
633                    if self.select_window > self.MIN_SELECT_WINDOW:
634                        self.select_window/=2
635                        self.select_window=int(self.select_window)
636                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
637                    if self.select_window > self.MIN_SELECT_WINDOW:
638                        self.select_window/=2
639                        self.select_window=int(self.select_window)
640                else:
641                    if self.select_window*2 < self.MAX_SELECT_WINDOW:
642                        self.select_window*=2
643                        self.select_window=int(self.select_window)
644            elif kwargs['nclients']<self.select_window/2:
645                self.select_window/=2
646                self.select_window=int(self.select_window)
647
648        if kwargs['event']=='CHECK_SANITY':
649            #nprocs=multiprocessing.cpu_count()
650            #limit=nprocs*LIMIT
651
652            self.load=self.get_cpu_load()
653            if not self.terminate and self.load > LIMIT:
654                self.paused = True
655            else:
656                self.paused = False
657
658            self.MEM_USED=self.get_mem_usage()
659            self.server_mem=self.get_server_free_mem()
660
661            max_heap=None
662            if self.server_variables and 'max_heap_table_size' in self.server_variables:
663                max_heap=int(self.server_variables['max_heap_table_size'])/(2**20)
664            self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages']
665            self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages']
666            self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages']
667            self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages']
668            if max_heap and self.temporary_tables_size['sum']:
669                if self.temporary_tables_size['sum'] > max_heap * 0.4:
670                    if self.select_window*2 < self.MAX_SELECT_WINDOW:
671                        self.select_window *= 2
672                        self.select_window=int(self.select_window)
673                    if self.paused:
674                        #printea('Hitting max temporary table size unpausing','critical')
675                        self.paused = False
676            if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER:
677                #printea('Hitting max memory from server collecting and reducing window','critical')
678                if self.select_window > self.MIN_SELECT_WINDOW:
679                    self.select_window/=2
680                    self.select_window=int(self.select_window)
681                self.db['main'].need_wait=True
682                self.USE_MAX_QUEUES=True
683                for x in THREADS:
684                    self.db[x].need_clean=True
685                gc.collect()
686            else:
687                self.db['main'].need_wait=False
688                self.USE_MAX_QUEUES=False
689                for x in THREADS:
690                    self.db[x].need_clean=True
691
692    def get_config(self):
693        pass
694
695    def get_cpu_load(self):
696        self.loadlist.append(psutil.cpu_percent())
697        self.loadlist=self.loadlist[1:]
698        avg=0.0
699        for x in self.loadlist:
700            avg+=x
701        return round(avg/100.0,2)
702
703    def put_config(self,key='',value=''):
704        pass
705
706    def term(self,*args,**kwargs):
707        printea("Begin kill the program, wait please...",'info')
708        self.terminate=True
709
710    def prepare_threads(self):
711        global DAEMON
712
713        self.threads['load']=Process(target=self.get_load)
714        self.threads['load'].daemon = DAEMON
715        self.threads['load'].start()
716        for x in THREADS:
717            self.threads[x]=Process(target=self.db[x].worker)
718            self.threads[x].daemon = DAEMON
719            self.threads[x].start()
720
721    def get_mem_usage(self):
722        process = psutil.Process(os.getpid())
723        mem = process.memory_info()[0] / float(2 ** 20)
724        return mem
725
726    def get_server_free_mem(self):
727        mem = psutil.virtual_memory()
728        return mem.free / (2 ** 20)
729
730    def print_stats(self):
731        global CHECK_LOAD_TIME
732#        if DEBUG:
733        if True:
734            out="Processed: "
735            out2=''
736            total=0
737            for x in THREADS:
738                out2+='{}={} '.format(x,self.db[x].processed)
739                self.cfg.store('processed '+x,self.db[x].processed)
740                if THREADED:
741                    if x != 'main':
742                        total += self.db[x].processed
743                else:
744                    total += self.db[x].processed
745            out += "TOTAL={} {}".format(total,out2)
746            self.cfg.store('processed total',total)
747            proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME)
748            self.procesed_per_sec.append(proc_per_sec)
749            self.procesed_per_sec=self.procesed_per_sec[-10:]
750            f=lambda x,y:x+y
751            suma_proc=reduce(f,self.procesed_per_sec)
752            self.cfg.store('processing at',int(suma_proc/10))
753            self.procesed=total
754            total = 0
755            if THREADED:
756                out+="Queues: "
757                sizes=self.get_q_sizes()
758                out2=''
759                for x in sizes:
760                    self.cfg.store('queue '+x,sizes[x])
761                    out2+='{}={} '.format(x,sizes[x])
762                    total += sizes[x]
763                self.cfg.store('queue totals',total)
764                out+="TOTAL={} {}".format(total,out2)
765            self.cfg.store('select window',self.select_window)
766            self.cfg.store('mem used',self.MEM_USED)
767            self.cfg.store('load',self.load)
768            if (self.paused):
769                self.cfg.store('paused',1)
770            else:
771                self.cfg.store('paused',0)
772            self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients']))
773            self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages']))
774            self.cfg.store('db_clients_size',int(self.db_tables_size['clients']))
775            self.cfg.store('db_packages_size',int(self.db_tables_size['packages']))
776            if DEBUG:
777                if THREADED:
778                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
779                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
780                else:
781                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
782                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
783
784    def get_q_sizes(self):
785        sizes={}
786        for x in THREADS:
787            sizes[x]=self.db[x].q.qsize()
788        return sizes
789
790    def sum_q_sizes(self):
791        sizes=self.get_q_sizes()
792        f=lambda x,y: x+y
793        return reduce(f,[sizes[x] for x in sizes])
794
795    def all_queues_empty(self):
796        sizes=self.get_q_sizes()
797        for x in sizes:
798            if sizes[x] != 0:
799                return False
800        return True
801
802    def get_load(self):
803        # runs on separate thread
804        db = DB(self,'load')
805        db.init_db()
806        self.cfg = Config(db)
807        ctime=0
808        while not (self.terminate and self.finished): #and not self.paused
809            self.cfg.store('keepalive',int(time.time()))
810            time.sleep(0.5)
811            ctime+=0.5
812            self.schedule(event='CHECK_SANITY')
813
814            if ctime >CHECK_LOAD_TIME:
815                self.cfg.write()
816                ctime=0.0
817                self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size('tmp_clients',True)
818                self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size('tmp_packages',True)
819                self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size('Client_Versions')
820                self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size('RecvPackages')
821                self.print_stats()
822                #self.load=os.getloadavg()[0]
823                if self.all_queues_empty():
824                    gc.collect()
825            #end if
826        #end while
827        db.close_db()
828
829    def start(self):
830        self.prepare_threads()
831
832    def end(self):
833        for x in self.db:
834            self.threads[x].join()
835            self.db[x].close_db()
836        self.finished = True
837        self.print_stats()
838
839
840class Config():
841    def __init__(self,connection):
842        self._db = connection
843        self.read()
844
845    def store(self,var,value):
846        var=var.replace(' ','_')
847        if isinstance(value,str):
848            if value.isnumeric():
849                setattr(self,var,int(value))
850            else:
851                setattr(self,var,str(value))
852        else:
853            setattr(self,var,int(value))
854
855    def write(self):
856        if self._db:
857            values={}
858            for x in self.get_internal_vars():
859                values.setdefault(str(x),str(getattr(self,x)))
860            self._db.put_config(values)
861
862    def read(self):
863        if self._db:
864            config=self._db.get_config()
865            for key in config.keys():
866                if config[key].isnumeric():
867                    setattr(self,key,int(config[key]))
868                else:
869                    setattr(self,key,config[key])
870        else:
871            print('No config yet')
872
873    def get_internal_vars(self):
874        return list(filter(lambda x : x[0] != '_',self.__dict__.keys()))
875
876    def print(self):
877        for v in self.get_internal_vars():
878            print('{} = {}'.format(v,getattr(self,v)))
879
880
881def main(*args,**kwargs):
882    gc.enable()
883    if DAEMON:
884        fp = open('/var/run/analyticsd.pid','w');
885        fp.write(str(os.getpid()))
886        fp.close()
887    m = Monitor()
888    m.start()
889    printea("start done",'info')
890    while not m.terminate:
891        time.sleep(0.5)
892    m.end()
893    printea("Exitting...",'info')
894
895if __name__ == "__main__":
896    size=len([ p.name() for p in psutil.process_iter() if p.name() == 'analyticsd' ])
897    if size > 1:
898        printea("Another daemon running... exitting now!")
899        sys.exit(1)
900    if DAEMON:
901        with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock('/var/run/analyticsd'),files_preserve=[logger.handlers[0].socket.fileno()]):
902            main()
903    else:
904        main()
Note: See TracBrowser for help on using the repository browser.