source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 5932

Last change on this file since 5932 was 5932, checked in by mabarracus, 2 years ago

Fixed initialization

  • Property svn:executable set to *
File size: 25.0 KB
Line 
1#!/usr/bin/python
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import Queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21import logging
22import logging.handlers
23from logging import config
24
25##### START EDITABLE VARS #####
26
27DEBUG=False
28DAEMON=True
29FILE='/usr/lib/analytics-server/analytics/config.php'
30LIMIT=5.0
31
32##### END EDITABLE VARS #####
33
34DEBUG_PRINT_ALIVE_COUNTER=10
35THREADED=True
36
37if DEBUG:
38    loglevel=logging.DEBUG
39else:
40    loglevel=logging.INFO
41
42LOGGING = {
43    'version': 1,
44    'disable_existing_loggers': False,
45    'formatters': {
46        'verbose': {
47            'format': '%(levelname)s %(module)s %(message)s'
48            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
49            },
50        },
51    'handlers': {
52        'stdout': {
53            'class': 'logging.StreamHandler',
54            'stream': sys.stdout,
55            'formatter': 'verbose',
56            },
57        'sys-logger6': {
58            'class': 'logging.handlers.SysLogHandler',
59            'address': '/dev/log',
60            'facility': "local6",
61            'formatter': 'verbose',
62            },
63        },
64    'loggers': {
65        'analyticsd-logger': {
66            'handlers': ['sys-logger6','stdout'],
67            'level': loglevel,
68            'propagate': True,
69            },
70        }
71    }
72
73
74def printea(msg="",level='critical'):
75    if level == 'critical':
76        logger.critical(msg)
77    elif level == 'info':
78        logger.info(msg)
79    else:
80        logger.debug(msg)
81
82def keepalive(who=""):
83    global DEBUG_PRINT_ALIVE_COUNTER
84    t=str(int(time.time()))
85    if DEBUG:
86        printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
87    else:
88        if who == 'main':
89            if DEBUG_PRINT_ALIVE_COUNTER > 0:
90                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
91            else:
92                DEBUG_PRINT_ALIVE_COUNTER=10
93                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
94
95    fp = open('/var/run/analyticsd.keepalive','w');
96    fp.write(t)
97    fp.close()
98
99config.dictConfig(LOGGING)
100logger = logging.getLogger('analyticsd-logger')
101
102DBNAME=None
103USER=None
104PASS=None
105IP=None
106
107
108EMPTY_PAUSED_SLEEP=10
109CHECK_LOAD_TIME=5
110MAX_RETRIES=5
111TIMED_ON=False
112
113try:
114    with open(FILE,'r') as f:
115        for line in f:
116            if not IP:
117                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
118                if IP:
119                    IP = IP.group(1)
120            if not DBNAME:
121                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
122                if DBNAME:
123                    DBNAME = DBNAME.group(1)
124            if not USER:
125                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
126                if USER:
127                    USER = USER.group(1)
128            if not PASS:
129                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
130                if PASS:
131                    PASS = PASS.group(1)
132    if not (IP or DBNAME or USER or PASS):
133        printea("Couldn't get database configuration from {}".format(FILE))
134    else:
135        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
136except Exception as e:
137    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
138    sys.exit(1)
139
140if not (IP or DBNAME or USER or PASS):
141    printea("Couldn't get database configuration from {}".format(FILE))
142    sys.exit(1)
143
144if THREADED:
145    THREADS=['main','server','client','desktop','other']
146else:
147    THREADS=['main']
148
149class DB():
150    def __init__(self,mon,t='main'):
151        self.t=t
152        self.reconnect=1
153        self.empty = False
154        self.conn=None
155        self.mon=mon
156        self.q=Queue.Queue()
157        self.processed=0
158        printea('Database worker {} initialized'.format(t),'info')
159
160    def timed(func):
161        @wraps(func)
162        def wrapper(*args,**kwargs):
163            if TIMED_ON:
164                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
165            ret=func(*args,**kwargs)
166            if TIMED_ON:
167                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
168            return ret
169        return wrapper
170
171    def with_retry(func):
172        @wraps(func)
173        def wrapper(*args,**kwargs):
174            if 'retry' not in kwargs:
175                kwargs['retry']=1
176            if 'mon' not in kwargs:
177                kwargs['mon'] = None
178            try:
179                return func(*args,**kwargs) 
180            except Exception as e:
181                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
182                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
183                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
184                    if kwargs['mon']:
185                        kwargs['mon'].term()
186                        sys.exit(1)
187                    return None
188                else:
189                    time.sleep(kwargs['retry']**2)
190                    kwargs['retry']+=1
191                    return wrapper(*args,**kwargs)
192            return result
193        return wrapper
194
195    def with_debug(func):
196        @wraps(func)
197        def wrapper(*args,**kwargs):
198            if 'query' in kwargs:
199                printea("executing query: {}".format(kwargs['query']),'debug')
200            return func(*args,**kwargs)
201        return wrapper
202
203    @with_debug
204    def execute(self,*args,**kwargs):
205        if 'query' not in kwargs:
206            printea("Warning execute called whithout query",'info')
207            return None
208        try:
209            return self.cur.execute(kwargs['query'])
210        except Exception as e:
211            raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
212
213    def init_db(self):
214        try:
215            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
216            self.conn.autocommit(False)
217            self.cur = self.conn.cursor()
218            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
219            printea("Connected succesfully {}".format(self.t),'info')
220
221        except mdb.Error, e:
222            printea("Error {}: {}".format(e.args[0],e.args[1]))
223            raise Exception(e)
224
225    def close_db(self):
226        if self.conn:
227            self.conn.close()
228            printea("Closed connection {}".format(self.t),'info')
229
230    def reduce_flavour(self,version,flavour):
231        if version == '15':
232            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
233                return 'desktop'
234            elif 'server' in flavour:
235                return 'server'
236            elif 'client' in flavour:
237                return 'client' 
238        elif version == '16':
239            if 'server' in flavour:
240                return 'server'
241            elif 'client' in flavour:
242                return 'client'
243            elif 'desktop' in flavour:
244                return 'desktop'
245        return 'other'
246
247    def reduce_version(self,version):
248        if version[0:2] in ['15','16']:
249            return version[0:2]
250        else:
251            return 'other'
252
253    def gen_uuid(self,*args,**kwargs):
254        return int(hashlib.sha1('-'.join([str(x) for x in args])).hexdigest()[0:16],16)
255
256    @timed
257    @with_retry
258    def get_client(self,*args,**kwargs):
259        try:
260            query="SELECT id,date,user,version,sabor from tmp_clients where status=1 LIMIT {}".format(self.mon.select_window)
261            self.execute(query=query)
262            ret =[]
263            if self.cur.rowcount > 0:
264                for i in range(self.cur.rowcount):
265                    v_id,v_date,v_user,v_version,v_flavour=self.cur.fetchone()
266                    version=self.reduce_version(v_version)
267                    flavour=self.reduce_flavour(version,v_flavour)
268                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
269                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour})
270                return ret
271            else:
272                return True
273        except Exception as e:
274            raise Exception("Error getting client: {}".format(e))
275
276    @timed
277    @with_retry
278    def get_apps(self,*args,**kwargs):
279        if 'clients' not in kwargs:
280            printea("Warning executed without named parameter clients",'info')
281            return None
282        ret = {}
283        try:
284            cids_to_rel_fla={}
285            for c in kwargs['clients']:
286                if c['id'] not in cids_to_rel_fla:
287                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
288                if c['flavour'] not in ret:
289                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
290            for c in kwargs['clients']:
291                ret[c['flavour']]['clients'].append(c)
292
293            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
294            self.execute(query=query)
295            if self.cur.rowcount > 0:
296                for i in range(self.cur.rowcount):
297                    row = self.cur.fetchone()
298                    clid = row[0]
299                    rel,fla = cids_to_rel_fla[clid]
300                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
301                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
302            return ret
303        except Exception as e:
304            raise Exception("Error getting apps: {}".format(e))
305
306    @timed
307    @with_retry
308    def put_client(self,*args,**kwargs):
309        if 'client_list' not in kwargs:
310            raise Exception("Called without named parameter client_list")
311        values= []
312        for cli in kwargs['client_list']:
313            values.append("({},'{}','{}','{}','{}','{}','{}')".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour']))
314        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
315        self.execute(query=query)
316        return True
317
318    @timed
319    @with_retry
320    def put_apps(self,*args,**kwargs):
321        if 'apps' not in kwargs:
322            raise Exception("Called without named parameter apps")
323        app_list = {}
324        for app in kwargs['apps']:
325            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
326            if str(app['uuid']) not in app_list:
327                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
328            else:
329                app_list[str(app['uuid'])]['value']+=app['value']
330        values = []
331        for app in app_list:
332            item=app_list[app]
333            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
334        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
335        self.execute(query=query)
336        return True
337
338    @timed
339    @with_retry
340    def del_client(self,*args,**kwargs):
341        if 'client_list' not in kwargs:
342            raise Exception("Called without named parameter client_list")
343        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
344        self.execute(query=query)
345        return True
346
347    @timed
348    @with_retry
349    def del_apps(self,*args,**kwargs):
350        if 'client_list' not in kwargs:
351            raise Exception("Called without named parameter client_list")
352        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
353        self.execute(query=query)
354        return True
355
356    @timed
357    def reset_autoinc(self):
358        try:
359            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_clients'".format(DBNAME)
360            self.execute(query=query)
361            ainc = self.cur.fetchone()[0]
362            query = "SELECT count(*) from tmp_clients"
363            self.execute(query=query)
364            cli_size=self.cur.fetchone()[0]
365            query = "SELECT count(*) from tmp_packages"
366            self.execute(query=query)
367            pkg_size=self.cur.fetchone()[0]
368            printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
369            if ainc > 65500 and self.mon.all_queues_empty() and cli_size == 0 and pkg_size == 0:
370                query = "TRUNCATE TABLE tmp_clients"
371                self.execute(query=query)
372                query = "TRUNCATE TABLE tmp_packages"
373                self.execute(query=query)
374            return True
375        except Exception as e:
376            raise Exception("Error reseting auto_increment: {}".format(e))
377
378
379    @timed
380    def process_main_thread(self,*args,**kwargs):
381        clis=self.get_client(mon=self.mon)
382        if clis == True: #No clients found (empty)
383            self.empty = True # if > 65500 auto_increment was reset
384            self.mon.schedule(event='NO_MORE_CLIENTS')
385            return False
386        #clients found
387        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
388        # if clients found get apps
389        lapps = self.get_apps(clients=clis,mon=self.mon)
390        #lapps can be empty
391        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
392        #If deletion was failed , thread was died
393        lapps_tmp={'apps':[],'clients':[]}
394        for fla in lapps:
395            if THREADED:
396                lapps[fla]['timestamp']=time.time()
397                self.mon.db[fla].q.put(lapps[fla],True)
398            else:
399                lapps_tmp['apps'].extend(lapps[fla]['apps'])
400                lapps_tmp['clients'].extend(lapps[fla]['clients'])
401                self.mon.db['main'].q.put(lapps_tmp,True)
402        if DEBUG:
403            self.processed+=len(clis)
404        return True
405
406    @timed
407    def process_all_threads(self,*args,**kwargs):
408        lapps=self.q.get(True)
409        #print "Running {}".format(self.t)
410        if THREADED:
411            while (lapps['timestamp'] > self.mon.commited):
412                time.sleep(0.001)
413        if len(lapps['apps']) != 0 and len(lapps['clients']) != 0:
414            #IF FAIL, AFTER RETRIES THREAD DIES
415            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
416                self.q.put(lapps,True) # USELESS
417                return False    # USELESS
418            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
419                self.q.put(lapps,True) # USELESS
420                return False    # USELESS
421            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
422                self.q.put(lapps,True) # USELESS
423                return False    # USELESS
424        if DEBUG:
425            self.processed+=len(lapps['clients'])
426        return True
427
428    def process(self,*args,**kwargs):
429        keepalive(self.t)
430        # code only for main thread
431        printea("Running thread {}".format(self.t),'debug')
432        if self.t == 'main' and not self.mon.terminate:
433            ret=self.process_main_thread(*args,**kwargs)
434            if ret == False: #No more clients available
435                return True
436            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
437                if THREADED:
438                    return True #Need return to avoid put empty flag and wait into main thread
439
440        #after this poing, code for all threads
441        if not self.q.empty():
442            ret=self.process_all_threads(*args,**kwargs)
443            if ret == False: # USELESS? THREADS was died
444                printea("Error threads")
445                return ret
446        else: 
447            self.empty = True
448        return True
449
450    def worker(self):
451        printea("Starting worker {} processing".format(self.t),'info')
452        while not (self.mon.terminate and self.empty):
453            if self.mon.paused or self.empty:
454                if self.empty and self.t == 'main':
455                    self.reset_autoinc()
456                if self.mon.paused:
457                    printea("Paused by high load {}".format(self.t),'debug')
458                if self.empty:
459                    printea("Empty queue {} sleeping by now".format(self.t),'debug')
460                if self.empty:
461                    self.empty = False
462                time.sleep(EMPTY_PAUSED_SLEEP)
463            else:
464                try:
465                    if self.conn == None:
466                        raise Exception('Connection not available, probably it\'s a first run')
467                    else:
468                        self.conn.begin()
469                    if self.process():
470                        self.conn.commit()
471                        self.mon.commited=time.time()
472                        self.reconnect = 0
473                    else:
474                        self.conn.rollback()
475                except Exception as e:
476                    try:
477                        if self.conn != None:
478                            self.conn.rollback()
479                    except:
480                        printea("Can't rollback last actions",'info')
481                        pass
482                    if e[0] != 2006:
483                        printea("Exception processing worker({}): {}".format(self.t,e))
484                    if e[0] == 2006: # SERVER GONE AWAY
485                        printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))
486                    printea("Trying to recover connection ({})".format(self.t))
487                    if self.reconnect == 100:
488                        printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t))
489                        self.mon.term()
490                    else:
491                        self.reconnect+=1
492                        printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
493                        time.sleep(self.reconnect*self.reconnect)
494                        try:
495                            self.init_db()
496                            printea("Recovered worker {} connection".format(self.t),'info')
497                        except:
498                            printea('Unable to initialize worker {}'.format(self.t))
499                            pass
500
501
502class Monitor():
503    def __init__(self):
504        self.MAX_QUEUE_UTILIZATION = 100
505        self.USE_MAX_QUEUES = False
506        self.MAX_SELECT_WINDOW = (2 ** 12) +1
507        self.MEM_USED=0
508        self.MAX_MEM=512
509        self.USE_MAX_MEM=True
510        self.lock = Lock()
511        self.terminate = False
512        self.finished = False
513        self.paused = False
514        self.select_window = 1
515        self.commited = time.time()
516        self.procesed = 0
517        self.procesed_per_sec = [0]*10
518        signal.signal(signal.SIGQUIT,self.term)
519        signal.signal(signal.SIGTERM,self.term)
520        signal.signal(signal.SIGINT,self.term)
521
522        self.db = {}
523        self.threads = {}
524        for x in THREADS:
525            self.db[x] = DB(self,x)
526            #try:
527            #    self.db[x].init_db()
528            #except Exception as e:
529            #    printea('Error initializing database connections: {}'.format(str(e)))
530            #    sys.exit(1)
531        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
532
533    def schedule(self,*args,**kwargs):
534        if kwargs['event']=='NO_MORE_CLIENTS':
535            if self.select_window > 1:
536                self.select_window/=2
537        if kwargs['event']=='HAVE_CLIENTS':
538            if kwargs['nclients'] == self.select_window:
539                if self.USE_MAX_QUEUES  and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
540                    if self.select_window > 1:
541                        self.select_window/=2
542                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
543                    if self.select_window > 1:
544                        self.select_window/=2
545                else:
546                    if self.select_window*2 < self.MAX_SELECT_WINDOW:
547                        self.select_window*=2
548            elif kwargs['nclients']<self.select_window/2:
549                self.select_window/=2
550
551    def get_config(self):
552        pass
553
554    def put_config(self,key='',value=''):
555        pass
556
557    def term(self,*args,**kwargs):
558        printea("Begin kill the program, wait please...",'info')
559        self.terminate=True
560
561    def prepare_threads(self):
562        global DAEMON
563
564        self.threads['load']=Process(target=self.get_load)
565        self.threads['load'].daemon = DAEMON
566        self.threads['load'].start()
567        for x in THREADS:
568            self.threads[x]=Process(target=self.db[x].worker)
569            self.threads[x].daemon = DAEMON
570            self.threads[x].start()
571
572    def get_mem_usage(self):
573        process = psutil.Process(os.getpid())
574        mem = process.get_memory_info()[0] / float(2 ** 20)
575        return mem
576
577    def print_stats(self):
578        global CHECK_LOAD_TIME
579        if DEBUG:
580            out="Processed: "
581            out2=''
582            total=0
583            for x in THREADS:
584                out2+='{}={} '.format(x,self.db[x].processed)
585                if THREADED:
586                    if x != 'main':
587                        total += self.db[x].processed
588                else:
589                    total += self.db[x].processed
590            out += "TOTAL={} {}".format(total,out2)
591            self.procesed_per_sec.append(int((total-self.procesed)/CHECK_LOAD_TIME))
592            self.procesed_per_sec=self.procesed_per_sec[-10:]
593            f=lambda x,y:x+y
594            suma_proc=reduce(f,self.procesed_per_sec)
595            self.procesed=total
596            total = 0
597            if THREADED:
598                out+="Queues: "
599                sizes=self.get_q_sizes()
600                out2=''
601                for x in sizes:
602                    out2+='{}={} '.format(x,sizes[x])
603                    total += sizes[x]
604                out+="TOTAL={} {}".format(total,out2)
605            if THREADED:
606                printea("{} {}SelectSize={} ProcessingAt={} Mem={}\n".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)),'debug')
607                #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
608            else:
609                printea("{} {}SelectSize={} ProcessingAt={} Mem={}\n".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)),'debug')
610                #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
611
612    def get_q_sizes(self):
613        sizes={}
614        for x in THREADS:
615            sizes[x]=self.db[x].q.qsize()
616        return sizes
617
618    def sum_q_sizes(self):
619        sizes=self.get_q_sizes()
620        f=lambda x,y: x+y
621        return reduce(f,[sizes[x] for x in sizes])
622
623    def all_queues_empty(self):
624        sizes=self.get_q_sizes()
625        for x in sizes:
626            if sizes[x] != 0:
627                return False
628        return True
629
630    def get_load(self):
631        nprocs=multiprocessing.cpu_count()
632        limit=nprocs*LIMIT
633        while not (self.terminate and not self.paused and self.finished):
634            self.print_stats()
635            self.load=os.getloadavg()[0]
636            if not self.terminate and self.load > limit:
637                self.paused = True
638            else:
639                self.paused = False
640            if self.all_queues_empty():
641                gc.collect()
642            self.MEM_USED=self.get_mem_usage()
643            time.sleep(CHECK_LOAD_TIME)
644
645    def start(self):
646        self.prepare_threads()
647
648    def end(self):
649        for x in self.db:
650            self.threads[x].join()
651            self.db[x].close_db()
652        self.finished = True
653        self.print_stats()
654
655
656def main(*args,**kwargs):
657    gc.enable()
658    if DAEMON:
659        fp = open('/var/run/analyticsd.pid','w');
660        fp.write(str(os.getpid()))
661        fp.close()
662    m = Monitor()
663    m.start()
664    printea("start done",'info')
665    while not m.terminate:
666        time.sleep(0.5)
667    m.end()
668    printea("Exitting...",'info')
669
670if __name__ == "__main__":
671    if DAEMON:
672        with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analyticsd'),files_preserve=[logger.handlers[0].socket.fileno()]):
673            main()
674    else:
675        main()
Note: See TracBrowser for help on using the repository browser.