source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 5560

Last change on this file since 5560 was 5560, checked in by mabarracus, 3 years ago

Complete code rewrite
New database model
Improved performance & optimization
Extended information about clients
Fixed older bugs
New testing framework
Fix postinst exit code

  • Property svn:executable set to *
File size: 21.6 KB
Line 
1#!/usr/bin/python
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import Queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21
22FILE='/usr/lib/analytics-server/analytics/config.php'
23DBNAME=None
24USER=None
25PASS=None
26IP=None
27
28DAEMON=True
29LIMIT=5.0
30DEBUG=0
31THREADED=True
32EMPTY_PAUSED_SLEEP=10
33CHECK_LOAD_TIME=5
34MAX_RETRIES=5
35TIMED_ON=False
36
37try:
38    with open(FILE,'r') as f:
39        for line in f:
40            if not IP:
41                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
42                if IP:
43                    IP = IP.group(1)
44            if not DBNAME:
45                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
46                if DBNAME:
47                    DBNAME = DBNAME.group(1)
48            if not USER:
49                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
50                if USER:
51                    USER = USER.group(1)
52            if not PASS:
53                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
54                if PASS:
55                    PASS = PASS.group(1)
56    if not (IP or DBNAME or USER or PASS):
57        print "Couldn't get database configuration from {}".format(FILE)
58    else:
59        if DEBUG:
60            print "Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS);
61except Exception as e:
62    print "Couldn't parse {} Error:\"{}\"".format(FILE,str(e))
63    sys.exit(1)
64
65if not (IP or DBNAME or USER or PASS):
66    print "Couldn't get database configuration from {}".format(FILE)
67    sys.exit(1)
68
69if THREADED:
70    THREADS=['main','server','client','desktop','other']
71else:
72    THREADS=['main']
73
74class DB():
75    def __init__(self,mon,t='main'):
76        self.t=t
77        self.empty = False
78        self.conn=None
79        self.mon=mon
80        self.q=Queue.Queue()
81        self.processed=0
82
83    def timed(func):
84        @wraps(func)
85        def wrapper(*args,**kwargs):
86            if TIMED_ON:
87                print "Start({}): @{}".format(func.__name__,time.time())
88            ret=func(*args,**kwargs)
89            if TIMED_ON:
90                print "End  ({}): @{}".format(func.__name__,time.time())
91            return ret
92        return wrapper
93
94    def with_retry(func):
95        @wraps(func)
96        def wrapper(*args,**kwargs):
97            if 'retry' not in kwargs:
98                kwargs['retry']=1
99            if 'mon' not in kwargs:
100                kwargs['mon'] = None
101            try:
102                return func(*args,**kwargs) 
103            except Exception as e:
104                print "Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e))
105                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
106                    print "Fatal error in ({}), max retries exceded".format(func.__name__)
107                    if kwargs['mon']:
108                        kwargs['mon'].term()
109                        sys.exit(1)
110                    return None
111                else:
112                    time.sleep(kwargs['retry']**2)
113                    kwargs['retry']+=1
114                    return wrapper(*args,**kwargs)
115            return result
116        return wrapper
117
118    def with_debug(func):
119        @wraps(func)
120        def wrapper(*args,**kwargs):
121            if DEBUG and DEBUG > 2:
122                if 'query' in kwargs:
123                    print "executing query: {}".format(kwargs['query'])
124            return func(*args,**kwargs)
125        return wrapper
126
127    @with_debug
128    def execute(self,*args,**kwargs):
129        if 'query' not in kwargs:
130            print "Warning execute called whithout query"
131            return None
132        try:
133            return self.cur.execute(kwargs['query'])
134        except Exception as e:
135            raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
136
137    def init_db(self):
138        try:
139            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
140            self.conn.autocommit(False)
141            self.cur = self.conn.cursor()
142            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
143            if DEBUG:
144                print "Connected succesfully {}".format(self.t)
145
146        except mdb.Error, e:
147            print "Error {}: {}".format(e.args[0],e.args[1])
148            raise Exception(e)
149
150    def close_db(self):
151        if self.conn:
152            self.conn.close()
153        if DEBUG:
154            print "Closed connection {}".format(self.t)
155
156    def reduce_flavour(self,version,flavour):
157        if version == '15':
158            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
159                return 'desktop'
160            elif 'server' in flavour:
161                return 'server'
162            elif 'client' in flavour:
163                return 'client' 
164        elif version == '16':
165            if 'server' in flavour:
166                return 'server'
167            elif 'client' in flavour:
168                return 'client'
169            elif 'desktop' in flavour:
170                return 'desktop'
171        return 'other'
172
173    def reduce_version(self,version):
174        if version[0:2] in ['15','16']:
175            return version[0:2]
176        else:
177            return 'other'
178
179    def gen_uuid(self,*args,**kwargs):
180        return int(hashlib.sha1('-'.join([str(x) for x in args])).hexdigest()[0:16],16)
181
182    @timed
183    @with_retry
184    def get_client(self,*args,**kwargs):
185        try:
186            query="SELECT id,date,user,version,sabor from tmp_clients where status=1 LIMIT {}".format(self.mon.select_window)
187            self.execute(query=query)
188            ret =[]
189            if self.cur.rowcount > 0:
190                for i in range(self.cur.rowcount):
191                    v_id,v_date,v_user,v_version,v_flavour=self.cur.fetchone()
192                    version=self.reduce_version(v_version)
193                    flavour=self.reduce_flavour(version,v_flavour)
194                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
195                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour})
196                return ret
197            else:
198                return True
199        except Exception as e:
200            raise Exception("Error getting client: {}".format(e))
201
202    @timed
203    @with_retry
204    def get_apps(self,*args,**kwargs):
205        if 'clients' not in kwargs:
206            print "Warning executed without named parameter clients"
207            return None
208        ret = {}
209        try:
210            cids_to_rel_fla={}
211            for c in kwargs['clients']:
212                if c['id'] not in cids_to_rel_fla:
213                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
214                if c['flavour'] not in ret:
215                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
216            for c in kwargs['clients']:
217                ret[c['flavour']]['clients'].append(c)
218
219            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
220            self.execute(query=query)
221            if self.cur.rowcount > 0:
222                for i in range(self.cur.rowcount):
223                    row = self.cur.fetchone()
224                    clid = row[0]
225                    rel,fla = cids_to_rel_fla[clid]
226                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
227                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
228            return ret
229        except Exception as e:
230            raise Exception("Error getting apps: {}".format(e))
231
232    @timed
233    @with_retry
234    def put_client(self,*args,**kwargs):
235        if 'client_list' not in kwargs:
236            raise Exception("Called without named parameter client_list")
237        values= []
238        for cli in kwargs['client_list']:
239            values.append("({},'{}','{}','{}','{}','{}','{}')".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour']))
240        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
241        self.execute(query=query)
242        return True
243
244    @timed
245    @with_retry
246    def put_apps(self,*args,**kwargs):
247        if 'apps' not in kwargs:
248            raise Exception("Called without named parameter apps")
249        app_list = {}
250        for app in kwargs['apps']:
251            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
252            if str(app['uuid']) not in app_list:
253                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
254            else:
255                app_list[str(app['uuid'])]['value']+=app['value']
256        values = []
257        for app in app_list:
258            item=app_list[app]
259            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
260        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
261        self.execute(query=query)
262        return True
263
264    @timed
265    @with_retry
266    def del_client(self,*args,**kwargs):
267        if 'client_list' not in kwargs:
268            raise Exception("Called without named parameter client_list")
269        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
270        self.execute(query=query)
271        return True
272
273    @timed
274    @with_retry
275    def del_apps(self,*args,**kwargs):
276        if 'client_list' not in kwargs:
277            raise Exception("Called without named parameter client_list")
278        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
279        self.execute(query=query)
280        return True
281
282    @timed
283    def reset_autoinc(self):
284        try:
285            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_clients'".format(DBNAME)
286            self.execute(query=query)
287            ainc = self.cur.fetchone()[0]
288            query = "SELECT count(*) from tmp_clients"
289            self.execute(query=query)
290            cli_size=self.cur.fetchone()[0]
291            query = "SELECT count(*) from tmp_packages"
292            self.execute(query=query)
293            pkg_size=self.cur.fetchone()[0]
294            if DEBUG and DEBUG > 1:
295                print "Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty())
296            if ainc > 65500 and self.mon.all_queues_empty() and cli_size == 0 and pkg_size == 0:
297                query = "TRUNCATE TABLE tmp_clients"
298                self.execute(query=query)
299                query = "TRUNCATE TABLE tmp_packages"
300                self.execute(query=query)
301            return True
302        except Exception as e:
303            raise Exception("Error reseting auto_increment: {}".format(e))
304
305
306    @timed
307    def process_main_thread(self,*args,**kwargs):
308        clis=self.get_client(mon=self.mon)
309        if clis == True: #No clients found (empty)
310            self.empty = True # if > 65500 auto_increment was reset
311            self.mon.schedule(event='NO_MORE_CLIENTS')
312            return False
313        #clients found
314        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
315        # if clients found get apps
316        lapps = self.get_apps(clients=clis,mon=self.mon)
317        #lapps can be empty
318        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
319        #If deletion was failed , thread was died
320        lapps_tmp={'apps':[],'clients':[]}
321        for fla in lapps:
322            if THREADED:
323                lapps[fla]['timestamp']=time.time()
324                self.mon.db[fla].q.put(lapps[fla],True)
325            else:
326                lapps_tmp['apps'].extend(lapps[fla]['apps'])
327                lapps_tmp['clients'].extend(lapps[fla]['clients'])
328                self.mon.db['main'].q.put(lapps_tmp,True)
329        if DEBUG:
330            self.processed+=len(clis)
331        return True
332
333    @timed
334    def process_all_threads(self,*args,**kwargs):
335        lapps=self.q.get(True)
336        #print "Running {}".format(self.t)
337        if THREADED:
338            while (lapps['timestamp'] > self.mon.commited):
339                time.sleep(0.001)
340        if len(lapps['apps']) != 0 and len(lapps['clients']) != 0:
341            #IF FAIL, AFTER RETRIES THREAD DIES
342            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
343                self.q.put(lapps,True) # USELESS
344                return False    # USELESS
345            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
346                self.q.put(lapps,True) # USELESS
347                return False    # USELESS
348            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
349                self.q.put(lapps,True) # USELESS
350                return False    # USELESS
351        if DEBUG:
352            self.processed+=len(lapps['clients'])
353        return True
354
355    def process(self,*args,**kwargs):
356        # code only for main thread
357        if DEBUG and DEBUG > 1:
358            print "Running thread {}".format(self.t)
359        if self.t == 'main' and not self.mon.terminate:
360            ret=self.process_main_thread(*args,**kwargs)
361            if ret == False: #No more clients available
362                return True
363            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
364                if THREADED:
365                    return True #Need return to avoid put empty flag and wait into main thread
366
367        #after this poing, code for all threads
368        if not self.q.empty():
369            ret=self.process_all_threads(*args,**kwargs)
370            if ret == False: # USELESS? THREADS was died
371                print "Error threads"
372                return ret
373        else: 
374            self.empty = True
375        return True
376
377    def worker(self):
378        while not (self.mon.terminate and self.empty):
379            if self.mon.paused or self.empty:
380                if self.empty and self.t == 'main':
381                    self.reset_autoinc()
382                if DEBUG and DEBUG > 1 and self.mon.paused:
383                    print "Paused by high load {}".format(self.t)
384                if DEBUG and DEBUG > 1 and self.empty:
385                    print "Empty queue {} sleeping by now".format(self.t)
386                if self.empty:
387                    self.empty = False
388                time.sleep(EMPTY_PAUSED_SLEEP)
389            else:
390                self.conn.begin()
391                try:
392                    if self.process():
393                        self.conn.commit()
394                        self.mon.commited=time.time()
395                        self.reconnect = 0
396                    else:
397                        self.conn.rollback()
398                except Exception as e:
399                    self.conn.rollback()
400                    if e[0] != 2006:
401                        print "Exception processing: {}".format(e)
402                    else:
403                        if self.reconnect == 100:
404                            print "Lost connection to database"
405                            self.mon.term()
406                        else:
407                            self.reconnect+=1
408                            time.sleep(self.reconnect*2)
409                            try:
410                                self.init_db()
411                            except:
412                                pass
413
414
415class Monitor():
416    def __init__(self):
417        self.MAX_QUEUE_UTILIZATION = 100
418        self.USE_MAX_QUEUES = False
419        self.MAX_SELECT_WINDOW = (2 ** 12) +1
420        self.MEM_USED=0
421        self.MAX_MEM=512
422        self.USE_MAX_MEM=True
423        self.lock = Lock()
424        self.terminate = False
425        self.finished = False
426        self.paused = False
427        self.select_window = 1
428        self.commited = time.time()
429        self.procesed = 0
430        self.procesed_per_sec = [0]*10
431        signal.signal(signal.SIGQUIT,self.term)
432        signal.signal(signal.SIGTERM,self.term)
433        signal.signal(signal.SIGINT,self.term)
434
435        self.db = {}
436        self.threads = {}
437        for x in THREADS:
438            self.db[x] = DB(self,x)
439            try:
440                self.db[x].init_db()
441            except Exception as e:
442                print 'Error initializing database connections: {}'.format(str(e))
443                sys.exit(1)
444
445    def schedule(self,*args,**kwargs):
446        if kwargs['event']=='NO_MORE_CLIENTS':
447            if self.select_window > 1:
448                self.select_window/=2
449        if kwargs['event']=='HAVE_CLIENTS':
450            if kwargs['nclients'] == self.select_window:
451                if self.USE_MAX_QUEUES  and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
452                    if self.select_window > 1:
453                        self.select_window/=2
454                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
455                    if self.select_window > 1:
456                        self.select_window/=2
457                else:
458                    if self.select_window*2 < self.MAX_SELECT_WINDOW:
459                        self.select_window*=2
460            elif kwargs['nclients']<self.select_window/2:
461                self.select_window/=2
462
463    def get_config(self):
464        pass
465
466    def put_config(self,key='',value=''):
467        pass
468
469    def term(self,*args,**kwargs):
470        print "Begin kill the program, wait please..."
471        self.terminate=True
472
473    def prepare_threads(self):
474        global DAEMON
475
476        self.threads['load']=Process(target=self.get_load)
477        self.threads['load'].daemon = DAEMON
478        self.threads['load'].start()
479        for x in THREADS:
480            self.threads[x]=Process(target=self.db[x].worker)
481            self.threads[x].daemon = DAEMON
482            self.threads[x].start()
483
484    def get_mem_usage(self):
485        process = psutil.Process(os.getpid())
486        mem = process.get_memory_info()[0] / float(2 ** 20)
487        return mem
488
489    def print_stats(self):
490        global CHECK_LOAD_TIME
491        if DEBUG:
492            out="Processed: "
493            out2=''
494            total=0
495            for x in THREADS:
496                out2+='{}={} '.format(x,self.db[x].processed)
497                if THREADED:
498                    if x != 'main':
499                        total += self.db[x].processed
500                else:
501                    total += self.db[x].processed
502            out += "TOTAL={} {}".format(total,out2)
503            self.procesed_per_sec.append(int((total-self.procesed)/CHECK_LOAD_TIME))
504            self.procesed_per_sec=self.procesed_per_sec[-10:]
505            f=lambda x,y:x+y
506            suma_proc=reduce(f,self.procesed_per_sec)
507            self.procesed=total
508            total = 0
509            if THREADED:
510                out+="Queues: "
511                sizes=self.get_q_sizes()
512                out2=''
513                for x in sizes:
514                    out2+='{}={} '.format(x,sizes[x])
515                    total += sizes[x]
516                out+="TOTAL={} {}".format(total,out2)
517            if THREADED:
518                sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
519            else:
520                sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
521
522    def get_q_sizes(self):
523        sizes={}
524        for x in THREADS:
525            sizes[x]=self.db[x].q.qsize()
526        return sizes
527
528    def sum_q_sizes(self):
529        sizes=self.get_q_sizes()
530        f=lambda x,y: x+y
531        return reduce(f,[sizes[x] for x in sizes])
532
533    def all_queues_empty(self):
534        sizes=self.get_q_sizes()
535        for x in sizes:
536            if sizes[x] != 0:
537                return False
538        return True
539
540    def get_load(self):
541        nprocs=multiprocessing.cpu_count()
542        limit=nprocs*LIMIT
543        while not (self.terminate and not self.paused and self.finished):
544            self.print_stats()
545            self.load=os.getloadavg()[0]
546            if not self.terminate and self.load > limit:
547                self.paused = True
548            else:
549                self.paused = False
550            if self.all_queues_empty():
551                gc.collect()
552            self.MEM_USED=self.get_mem_usage()
553            time.sleep(CHECK_LOAD_TIME)
554
555    def start(self):
556        self.prepare_threads()
557
558    def end(self):
559        for x in self.db:
560            self.threads[x].join()
561            self.db[x].close_db()
562        self.finished = True
563        self.print_stats()
564
565
566def main(*args,**kwargs):
567    gc.enable()
568    if DAEMON:
569        fp = open('/var/run/analyticsd.pid','w');
570        fp.write(str(os.getpid()))
571        fp.close()
572    m = Monitor()
573    m.start()
574    print "start done"
575    while not m.terminate:
576        time.sleep(0.5)
577    m.end()
578    print "Exitting..."
579
580if __name__ == "__main__":
581    if DAEMON:
582        with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analyticsd')):
583            main()
584    else:
585        main()
Note: See TracBrowser for help on using the repository browser.