source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 5926

Last change on this file since 5926 was 5926, checked in by mabarracus, 3 years ago

Fixed init script installation & update procedures
Syslog logging
Fixed reconnection when mysql is not available

  • Property svn:executable set to *
File size: 24.4 KB
Line 
1#!/usr/bin/python
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import Queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21import logging
22import logging.handlers
23from logging import config
24
25##### START EDITABLE VARS #####
26
27DEBUG=False
28DAEMON=True
29FILE='/usr/lib/analytics-server/analytics/config.php'
30LIMIT=5.0
31
32##### END EDITABLE VARS #####
33
34DEBUG_PRINT_ALIVE_COUNTER=10
35THREADED=True
36
37if DEBUG:
38    loglevel=logging.DEBUG
39else:
40    loglevel=logging.INFO
41
42LOGGING = {
43    'version': 1,
44    'disable_existing_loggers': False,
45    'formatters': {
46        'verbose': {
47            'format': '%(levelname)s %(module)s %(message)s'
48            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
49            },
50        },
51    'handlers': {
52        'stdout': {
53            'class': 'logging.StreamHandler',
54            'stream': sys.stdout,
55            'formatter': 'verbose',
56            },
57        'sys-logger6': {
58            'class': 'logging.handlers.SysLogHandler',
59            'address': '/dev/log',
60            'facility': "local6",
61            'formatter': 'verbose',
62            },
63        },
64    'loggers': {
65        'analyticsd-logger': {
66            'handlers': ['sys-logger6','stdout'],
67            'level': loglevel,
68            'propagate': True,
69            },
70        }
71    }
72
73
74def printea(msg="",level='critical'):
75    if level == 'critical':
76        logger.critical(msg)
77    elif level == 'info':
78        logger.info(msg)
79    else:
80        logger.debug(msg)
81
82def keepalive(who=""):
83    global DEBUG_PRINT_ALIVE_COUNTER
84    t=str(int(time.time()))
85    if DEBUG:
86        printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
87    else:
88        if who == 'main':
89            if DEBUG_PRINT_ALIVE_COUNTER > 0:
90                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
91            else:
92                DEBUG_PRINT_ALIVE_COUNTER=10
93                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
94
95    fp = open('/var/run/analyticsd.keepalive','w');
96    fp.write(t)
97    fp.close()
98
99config.dictConfig(LOGGING)
100logger = logging.getLogger('analyticsd-logger')
101
102DBNAME=None
103USER=None
104PASS=None
105IP=None
106
107
108EMPTY_PAUSED_SLEEP=10
109CHECK_LOAD_TIME=5
110MAX_RETRIES=5
111TIMED_ON=False
112
113try:
114    with open(FILE,'r') as f:
115        for line in f:
116            if not IP:
117                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
118                if IP:
119                    IP = IP.group(1)
120            if not DBNAME:
121                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
122                if DBNAME:
123                    DBNAME = DBNAME.group(1)
124            if not USER:
125                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
126                if USER:
127                    USER = USER.group(1)
128            if not PASS:
129                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
130                if PASS:
131                    PASS = PASS.group(1)
132    if not (IP or DBNAME or USER or PASS):
133        printea("Couldn't get database configuration from {}".format(FILE))
134    else:
135        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
136except Exception as e:
137    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
138    sys.exit(1)
139
140if not (IP or DBNAME or USER or PASS):
141    printea("Couldn't get database configuration from {}".format(FILE))
142    sys.exit(1)
143
144if THREADED:
145    THREADS=['main','server','client','desktop','other']
146else:
147    THREADS=['main']
148
149class DB():
150    def __init__(self,mon,t='main'):
151        self.t=t
152        self.empty = False
153        self.conn=None
154        self.mon=mon
155        self.q=Queue.Queue()
156        self.processed=0
157        printea('Database worker {} initialized'.format(t),'info')
158
159    def timed(func):
160        @wraps(func)
161        def wrapper(*args,**kwargs):
162            if TIMED_ON:
163                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
164            ret=func(*args,**kwargs)
165            if TIMED_ON:
166                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
167            return ret
168        return wrapper
169
170    def with_retry(func):
171        @wraps(func)
172        def wrapper(*args,**kwargs):
173            if 'retry' not in kwargs:
174                kwargs['retry']=1
175            if 'mon' not in kwargs:
176                kwargs['mon'] = None
177            try:
178                return func(*args,**kwargs) 
179            except Exception as e:
180                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
181                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
182                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
183                    if kwargs['mon']:
184                        kwargs['mon'].term()
185                        sys.exit(1)
186                    return None
187                else:
188                    time.sleep(kwargs['retry']**2)
189                    kwargs['retry']+=1
190                    return wrapper(*args,**kwargs)
191            return result
192        return wrapper
193
194    def with_debug(func):
195        @wraps(func)
196        def wrapper(*args,**kwargs):
197            if 'query' in kwargs:
198                printea("executing query: {}".format(kwargs['query']),'debug')
199            return func(*args,**kwargs)
200        return wrapper
201
202    @with_debug
203    def execute(self,*args,**kwargs):
204        if 'query' not in kwargs:
205            printea("Warning execute called whithout query",'info')
206            return None
207        try:
208            return self.cur.execute(kwargs['query'])
209        except Exception as e:
210            raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
211
212    def init_db(self):
213        try:
214            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
215            self.conn.autocommit(False)
216            self.cur = self.conn.cursor()
217            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
218            printea("Connected succesfully {}".format(self.t),'info')
219
220        except mdb.Error, e:
221            printea("Error {}: {}".format(e.args[0],e.args[1]))
222            raise Exception(e)
223
224    def close_db(self):
225        if self.conn:
226            self.conn.close()
227            printea("Closed connection {}".format(self.t),'info')
228
229    def reduce_flavour(self,version,flavour):
230        if version == '15':
231            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
232                return 'desktop'
233            elif 'server' in flavour:
234                return 'server'
235            elif 'client' in flavour:
236                return 'client' 
237        elif version == '16':
238            if 'server' in flavour:
239                return 'server'
240            elif 'client' in flavour:
241                return 'client'
242            elif 'desktop' in flavour:
243                return 'desktop'
244        return 'other'
245
246    def reduce_version(self,version):
247        if version[0:2] in ['15','16']:
248            return version[0:2]
249        else:
250            return 'other'
251
252    def gen_uuid(self,*args,**kwargs):
253        return int(hashlib.sha1('-'.join([str(x) for x in args])).hexdigest()[0:16],16)
254
255    @timed
256    @with_retry
257    def get_client(self,*args,**kwargs):
258        try:
259            query="SELECT id,date,user,version,sabor from tmp_clients where status=1 LIMIT {}".format(self.mon.select_window)
260            self.execute(query=query)
261            ret =[]
262            if self.cur.rowcount > 0:
263                for i in range(self.cur.rowcount):
264                    v_id,v_date,v_user,v_version,v_flavour=self.cur.fetchone()
265                    version=self.reduce_version(v_version)
266                    flavour=self.reduce_flavour(version,v_flavour)
267                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
268                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour})
269                return ret
270            else:
271                return True
272        except Exception as e:
273            raise Exception("Error getting client: {}".format(e))
274
275    @timed
276    @with_retry
277    def get_apps(self,*args,**kwargs):
278        if 'clients' not in kwargs:
279            printea("Warning executed without named parameter clients",'info')
280            return None
281        ret = {}
282        try:
283            cids_to_rel_fla={}
284            for c in kwargs['clients']:
285                if c['id'] not in cids_to_rel_fla:
286                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
287                if c['flavour'] not in ret:
288                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
289            for c in kwargs['clients']:
290                ret[c['flavour']]['clients'].append(c)
291
292            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
293            self.execute(query=query)
294            if self.cur.rowcount > 0:
295                for i in range(self.cur.rowcount):
296                    row = self.cur.fetchone()
297                    clid = row[0]
298                    rel,fla = cids_to_rel_fla[clid]
299                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
300                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
301            return ret
302        except Exception as e:
303            raise Exception("Error getting apps: {}".format(e))
304
305    @timed
306    @with_retry
307    def put_client(self,*args,**kwargs):
308        if 'client_list' not in kwargs:
309            raise Exception("Called without named parameter client_list")
310        values= []
311        for cli in kwargs['client_list']:
312            values.append("({},'{}','{}','{}','{}','{}','{}')".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour']))
313        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
314        self.execute(query=query)
315        return True
316
317    @timed
318    @with_retry
319    def put_apps(self,*args,**kwargs):
320        if 'apps' not in kwargs:
321            raise Exception("Called without named parameter apps")
322        app_list = {}
323        for app in kwargs['apps']:
324            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
325            if str(app['uuid']) not in app_list:
326                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
327            else:
328                app_list[str(app['uuid'])]['value']+=app['value']
329        values = []
330        for app in app_list:
331            item=app_list[app]
332            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
333        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
334        self.execute(query=query)
335        return True
336
337    @timed
338    @with_retry
339    def del_client(self,*args,**kwargs):
340        if 'client_list' not in kwargs:
341            raise Exception("Called without named parameter client_list")
342        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
343        self.execute(query=query)
344        return True
345
346    @timed
347    @with_retry
348    def del_apps(self,*args,**kwargs):
349        if 'client_list' not in kwargs:
350            raise Exception("Called without named parameter client_list")
351        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
352        self.execute(query=query)
353        return True
354
355    @timed
356    def reset_autoinc(self):
357        try:
358            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_clients'".format(DBNAME)
359            self.execute(query=query)
360            ainc = self.cur.fetchone()[0]
361            query = "SELECT count(*) from tmp_clients"
362            self.execute(query=query)
363            cli_size=self.cur.fetchone()[0]
364            query = "SELECT count(*) from tmp_packages"
365            self.execute(query=query)
366            pkg_size=self.cur.fetchone()[0]
367            printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
368            if ainc > 65500 and self.mon.all_queues_empty() and cli_size == 0 and pkg_size == 0:
369                query = "TRUNCATE TABLE tmp_clients"
370                self.execute(query=query)
371                query = "TRUNCATE TABLE tmp_packages"
372                self.execute(query=query)
373            return True
374        except Exception as e:
375            raise Exception("Error reseting auto_increment: {}".format(e))
376
377
378    @timed
379    def process_main_thread(self,*args,**kwargs):
380        clis=self.get_client(mon=self.mon)
381        if clis == True: #No clients found (empty)
382            self.empty = True # if > 65500 auto_increment was reset
383            self.mon.schedule(event='NO_MORE_CLIENTS')
384            return False
385        #clients found
386        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
387        # if clients found get apps
388        lapps = self.get_apps(clients=clis,mon=self.mon)
389        #lapps can be empty
390        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
391        #If deletion was failed , thread was died
392        lapps_tmp={'apps':[],'clients':[]}
393        for fla in lapps:
394            if THREADED:
395                lapps[fla]['timestamp']=time.time()
396                self.mon.db[fla].q.put(lapps[fla],True)
397            else:
398                lapps_tmp['apps'].extend(lapps[fla]['apps'])
399                lapps_tmp['clients'].extend(lapps[fla]['clients'])
400                self.mon.db['main'].q.put(lapps_tmp,True)
401        if DEBUG:
402            self.processed+=len(clis)
403        return True
404
405    @timed
406    def process_all_threads(self,*args,**kwargs):
407        lapps=self.q.get(True)
408        #print "Running {}".format(self.t)
409        if THREADED:
410            while (lapps['timestamp'] > self.mon.commited):
411                time.sleep(0.001)
412        if len(lapps['apps']) != 0 and len(lapps['clients']) != 0:
413            #IF FAIL, AFTER RETRIES THREAD DIES
414            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
415                self.q.put(lapps,True) # USELESS
416                return False    # USELESS
417            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
418                self.q.put(lapps,True) # USELESS
419                return False    # USELESS
420            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
421                self.q.put(lapps,True) # USELESS
422                return False    # USELESS
423        if DEBUG:
424            self.processed+=len(lapps['clients'])
425        return True
426
427    def process(self,*args,**kwargs):
428        keepalive(self.t)
429        # code only for main thread
430        printea("Running thread {}".format(self.t),'debug')
431        if self.t == 'main' and not self.mon.terminate:
432            ret=self.process_main_thread(*args,**kwargs)
433            if ret == False: #No more clients available
434                return True
435            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
436                if THREADED:
437                    return True #Need return to avoid put empty flag and wait into main thread
438
439        #after this poing, code for all threads
440        if not self.q.empty():
441            ret=self.process_all_threads(*args,**kwargs)
442            if ret == False: # USELESS? THREADS was died
443                printea("Error threads")
444                return ret
445        else: 
446            self.empty = True
447        return True
448
449    def worker(self):
450        while not (self.mon.terminate and self.empty):
451            if self.mon.paused or self.empty:
452                if self.empty and self.t == 'main':
453                    self.reset_autoinc()
454                if self.mon.paused:
455                    printea("Paused by high load {}".format(self.t),'debug')
456                if self.empty:
457                    printea("Empty queue {} sleeping by now".format(self.t),'debug')
458                if self.empty:
459                    self.empty = False
460                time.sleep(EMPTY_PAUSED_SLEEP)
461            else:
462                try:
463                    self.conn.begin()
464                    if self.process():
465                        self.conn.commit()
466                        self.mon.commited=time.time()
467                        self.reconnect = 0
468                    else:
469                        self.conn.rollback()
470                except Exception as e:
471                    try:
472                        self.conn.rollback()
473                    except:
474                        printea("Can't rollback last actions",'info')
475                        pass
476                    if e[0] != 2006:
477                        printea("Exception processing: {}".format(e))
478                    else:
479                        if self.reconnect == 100:
480                            printea("Lost connection to database")
481                            self.mon.term()
482                        else:
483                            self.reconnect+=1
484                            printea('Reconnect to mysql({}) sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
485                            time.sleep(self.reconnect*self.reconnect)
486                            try:
487                                self.init_db()
488                            except:
489                                printea('Unable to initialize worker {}'.format(self.t))
490                                pass
491
492
493class Monitor():
494    def __init__(self):
495        self.MAX_QUEUE_UTILIZATION = 100
496        self.USE_MAX_QUEUES = False
497        self.MAX_SELECT_WINDOW = (2 ** 12) +1
498        self.MEM_USED=0
499        self.MAX_MEM=512
500        self.USE_MAX_MEM=True
501        self.lock = Lock()
502        self.terminate = False
503        self.finished = False
504        self.paused = False
505        self.select_window = 1
506        self.commited = time.time()
507        self.procesed = 0
508        self.procesed_per_sec = [0]*10
509        signal.signal(signal.SIGQUIT,self.term)
510        signal.signal(signal.SIGTERM,self.term)
511        signal.signal(signal.SIGINT,self.term)
512
513        self.db = {}
514        self.threads = {}
515        for x in THREADS:
516            self.db[x] = DB(self,x)
517            try:
518                self.db[x].init_db()
519            except Exception as e:
520                printea('Error initializing database connections: {}'.format(str(e)))
521                sys.exit(1)
522        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
523
524    def schedule(self,*args,**kwargs):
525        if kwargs['event']=='NO_MORE_CLIENTS':
526            if self.select_window > 1:
527                self.select_window/=2
528        if kwargs['event']=='HAVE_CLIENTS':
529            if kwargs['nclients'] == self.select_window:
530                if self.USE_MAX_QUEUES  and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
531                    if self.select_window > 1:
532                        self.select_window/=2
533                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
534                    if self.select_window > 1:
535                        self.select_window/=2
536                else:
537                    if self.select_window*2 < self.MAX_SELECT_WINDOW:
538                        self.select_window*=2
539            elif kwargs['nclients']<self.select_window/2:
540                self.select_window/=2
541
542    def get_config(self):
543        pass
544
545    def put_config(self,key='',value=''):
546        pass
547
548    def term(self,*args,**kwargs):
549        printea("Begin kill the program, wait please...",'info')
550        self.terminate=True
551
552    def prepare_threads(self):
553        global DAEMON
554
555        self.threads['load']=Process(target=self.get_load)
556        self.threads['load'].daemon = DAEMON
557        self.threads['load'].start()
558        for x in THREADS:
559            self.threads[x]=Process(target=self.db[x].worker)
560            self.threads[x].daemon = DAEMON
561            self.threads[x].start()
562
563    def get_mem_usage(self):
564        process = psutil.Process(os.getpid())
565        mem = process.get_memory_info()[0] / float(2 ** 20)
566        return mem
567
568    def print_stats(self):
569        global CHECK_LOAD_TIME
570        if DEBUG:
571            out="Processed: "
572            out2=''
573            total=0
574            for x in THREADS:
575                out2+='{}={} '.format(x,self.db[x].processed)
576                if THREADED:
577                    if x != 'main':
578                        total += self.db[x].processed
579                else:
580                    total += self.db[x].processed
581            out += "TOTAL={} {}".format(total,out2)
582            self.procesed_per_sec.append(int((total-self.procesed)/CHECK_LOAD_TIME))
583            self.procesed_per_sec=self.procesed_per_sec[-10:]
584            f=lambda x,y:x+y
585            suma_proc=reduce(f,self.procesed_per_sec)
586            self.procesed=total
587            total = 0
588            if THREADED:
589                out+="Queues: "
590                sizes=self.get_q_sizes()
591                out2=''
592                for x in sizes:
593                    out2+='{}={} '.format(x,sizes[x])
594                    total += sizes[x]
595                out+="TOTAL={} {}".format(total,out2)
596            if THREADED:
597                printea("{} {}SelectSize={} ProcessingAt={} Mem={}\n".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)),'debug')
598                #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
599            else:
600                printea("{} {}SelectSize={} ProcessingAt={} Mem={}\n".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)),'debug')
601                #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
602
603    def get_q_sizes(self):
604        sizes={}
605        for x in THREADS:
606            sizes[x]=self.db[x].q.qsize()
607        return sizes
608
609    def sum_q_sizes(self):
610        sizes=self.get_q_sizes()
611        f=lambda x,y: x+y
612        return reduce(f,[sizes[x] for x in sizes])
613
614    def all_queues_empty(self):
615        sizes=self.get_q_sizes()
616        for x in sizes:
617            if sizes[x] != 0:
618                return False
619        return True
620
621    def get_load(self):
622        nprocs=multiprocessing.cpu_count()
623        limit=nprocs*LIMIT
624        while not (self.terminate and not self.paused and self.finished):
625            self.print_stats()
626            self.load=os.getloadavg()[0]
627            if not self.terminate and self.load > limit:
628                self.paused = True
629            else:
630                self.paused = False
631            if self.all_queues_empty():
632                gc.collect()
633            self.MEM_USED=self.get_mem_usage()
634            time.sleep(CHECK_LOAD_TIME)
635
636    def start(self):
637        self.prepare_threads()
638
639    def end(self):
640        for x in self.db:
641            self.threads[x].join()
642            self.db[x].close_db()
643        self.finished = True
644        self.print_stats()
645
646
647def main(*args,**kwargs):
648    gc.enable()
649    if DAEMON:
650        fp = open('/var/run/analyticsd.pid','w');
651        fp.write(str(os.getpid()))
652        fp.close()
653    m = Monitor()
654    m.start()
655    printea("start done",'info')
656    while not m.terminate:
657        time.sleep(0.5)
658    m.end()
659    printea("Exitting...",'info')
660
661if __name__ == "__main__":
662    if DAEMON:
663        with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analyticsd'),files_preserve=[logger.handlers[0].socket.fileno()]):
664            main()
665    else:
666        main()
Note: See TracBrowser for help on using the repository browser.