source: lliurex-statistics/trunk/fuentes/lliurex-statistics.install/usr/sbin/analytics @ 4866

Last change on this file since 4866 was 4866, checked in by mabarracus, 2 years ago

Fix race condition that stops sending messages when time changes
Optimize subprocessing calls
Better flavour detection

  • Property svn:executable set to *
File size: 15.7 KB
Line 
1#!/usr/bin/python
2
3import re
4import os
5import sys
6import signal
7import subprocess
8import threading
9import shlex
10import sqlite3
11from datetime import date
12import string
13import requests
14import json
15import ConfigParser
16import daemon
17import lockfile
18import xmlrpclib
19import time
20
21######### END CONFIG ###########
22
23
24#
25# Shows debug data
26#
27def debug(text='',type=1):
28    global DEBUG, daemon_mode, DEBUG_FILE
29    fp_debug=open(DEBUG_FILE,"a")
30    if DEBUG:
31        if type == 1:
32            if daemon_mode != True:
33                print str(text)
34            else:
35                fp_debug.write(str(text)+"\n")
36        else:
37            if daemon_mode != True:
38                sys.stdout.write(text)
39            else:
40                fp_debug.write(str(text))
41        sys.stdout.flush()
42    fp_debug.close()
43#END def debug(text='',type=1):
44
45#
46# Handler to manage interrupt properly, sync data before die when SIGINT
47#
48def sig_handler(sig,frame):
49    global t,LIST,RUNNER
50    debug("Aborting...")
51    RUNNER=0
52    try:
53        # Cancel threads
54        t.cancel()
55        # Sync data
56        update_db()
57        # Show stored data to stdout
58        show_list_db()
59    except:
60        sys.exit(1)
61    sys.exit(0)
62#END def sig_handler(sig,frame):
63
64#
65# Forces change file when SIGUSR1
66#
67def sig_handler_change(sig, frame):
68        change_file();
69
70#
71# Forces change file and update server database when SIGUSR1
72#
73def change_file():
74        global LIST,filename,next_filename,LOGPATH
75        # sync in-memory and temporary file data
76        update_db()
77        # clear in-memory list
78        LIST.clear()
79        # Check if permission flag is granted
80        if allow_send():
81            # Send data to master server
82            send_data()
83        else:
84            debug("Sending not allowed when try to change file")
85        # Get new temporary filename
86        next_filename=get_filename(LOGPATH)
87        # Sync data in-memory and temporary file data
88        update_db()
89        # Show curent data to stdout
90        show_list()
91#END def change_file():
92
93def check_server_acknowledge():
94        try:
95            c = xmlrpclib.ServerProxy("https://server:9779/")
96            return c.get_variable("","VariablesManager","STATS_ENABLED")
97        except:
98            return 'None'
99#END def check_server_acknowledge()
100
101def check_local_acknowledge():
102        answer = None
103        if os.path.isfile('/etc/lliurex-analytics/status'):
104            fp = open('/etc/lliurex-analytics/status','r')
105            answer = fp.readline();
106            fp.close()
107        return answer.strip()
108
109#END check_local_acknowledge
110
111def allow_send():
112    global SABOR
113
114    if SABOR.lower() != 'server':
115        answer = check_server_acknowledge()
116        if answer == '1':
117            debug("allow_send stats by check server acknowledge")
118            return True
119        if answer == '0':
120            debug("deny allow_send stats by check server acknowledge")
121            return False
122
123    answer = check_local_acknowledge()
124    if answer == "yes":
125        debug("allow_send stats by check local acknowledge")
126        return True
127    if answer == 'no':
128        debug("deny allow_send by check local acknowledge")
129        return False
130
131    debug("deny allow_send stats by exception check server & local acknowledge")
132    return False
133
134#END def allow_send()
135
136#
137# Sends data to server and clear all
138#
139def clean_and_send():
140        global LIST
141        # sync in-memory and temporary file data
142        update_db()
143        # clear in-memory list
144        LIST.clear()
145        if allow_send(): 
146            # Send data to master server
147            send_data()
148        else:
149            debug("Sending not allowed when try send file")
150        # Clear database file
151        clear_db()
152        # Sync data in-memory and temporary file data
153        update_db()
154#END def clean_and_send():
155
156#
157# Call show data when SIGINT2
158#
159def sig_handler_show(sig,frame):
160        debug("current filename "+filename)
161        show_list()
162        show_list_db()
163        global TIMEOUT
164        global BDTIMEOUT
165        global ctimeout
166        global btimeout
167        debug('TIMEOUT='+str(TIMEOUT)+',BDTIMEOUT='+str(BDTIMEOUT)+',ctimeout='+str(ctimeout)+',btimeout='+str(btimeout))
168#END def sig_handler_show(sig,frame):
169
170#
171# Global function called every second to run functions guided by timeout
172#
173def runner():
174        global t
175        global ctimeout
176        global btimeout
177        global TIMEOUT
178        global BDTIMEOUT
179        global RUNNER
180       
181        while RUNNER == 1:
182            # Flush in-memory data to file db when expires timeout
183            if ctimeout > TIMEOUT:
184                update_db()
185                ctimeout=0
186            else:
187                ctimeout+=1
188
189            # Call send-data to server when expires dbtimeout
190            if btimeout > BDTIMEOUT:
191                clean_and_send();
192                btimeout=0
193            else:
194                btimeout+=1
195            debug('.',2)
196            time.sleep(1.0)
197            # Reload 1 sec timeout
198            #t=threading.Timer(1.0,runner)
199#           De-comment to allow stop threads inmediately
200#           t.daemon = True
201            #t.start()
202#END def runner():
203
204#
205# Add item to in-memory LIST
206#
207def add_item(item):
208        global LIST,BLACKLIST,INTERPRETERS
209
210        # Split binary and parameters
211        #part=shlex.split(item)       
212        part=item.split(' ')
213        # Check binary name is correct
214        if ('/' in part[0]):
215                part[0]=part[0].split('/')[-1]         
216        # Check if binary name must be recorded
217        if (part[0] in BLACKLIST):
218                return
219        # Check if is a known interpreter
220        if (part[0] in INTERPRETERS):
221                part[0]=part[1]
222                # Check script name is correct
223                if ('/' in part[0]):
224                        part[0]=part[0].split('/')[-1]
225                # Check if it's a parameter
226                i=0
227                novalid=['-',':']
228                while (part[i][0] in novalid and i < len(part)):
229                        i+=1
230                if (i < len(part)):
231                        part[0]=part[i]
232                else:
233                        return
234        if ('/' in part[0]):
235                        part[0]=part[0].split('/')[-1]
236
237        # Check if binary name must be recorded
238        if (part[0] in BLACKLIST):
239                return
240
241        # Add or append to global list
242        if LIST.has_key(part[0]):
243                debug('+',2)
244                LIST[part[0]]+=1
245        else:
246                debug('*',2)
247                LIST[part[0]]=1
248#END def add_item(item):
249
250#
251# Show in-memory LIST and print to stdout
252#
253def show_list():
254        global LIST,daemon_mode,DEBUG_FILE
255        if daemon_mode != True:
256            print "Lista:"
257            print "~~~~~~~~~~"
258            for it in LIST:
259                print it + "->" + str(LIST[it])
260        else:
261            fp_debug=open(DEBUG_FILE,"a")
262            fp_debug.write("Lista:\n~~~~~~~\n");
263            for it in LIST:
264                fp_debug.write(it + "->" + str(LIST[it]) + "\n")
265            fp_debug.close()
266#END def show_list():
267
268#
269# Send data from temporary db to server using http post method
270#
271def send_data():
272    global SABOR
273    global RELEASE
274
275    # Calculates mac address
276    f = open('/sys/class/net/eth0/address','r');
277    uid=f.read().strip();
278    f.close();
279
280    global filename,url,proxy,headers;
281   
282    db = sqlite3.connect(filename);
283    cur= db.cursor();
284    try:
285        cur.execute('select * from info order by n DESC limit 100;');
286        json_tmp={};
287        # Dumps temporary database building json object to send
288        for x in cur.fetchall():
289            json_tmp[x[0]]=str(x[1]);
290    except:
291        return;
292   
293    # Get version and flavour
294    #p=subprocess.Popen(['lliurex-version','-n'],stdout=subprocess.PIPE)
295    #out,err=p.communicate()
296    #version=out.strip();
297    version=RELEASE
298
299    #p=subprocess.Popen(['lliurex-version','-f'],stdout=subprocess.PIPE)
300    #out,err=p.communicate()
301    #sabor=out.strip();
302    sabor=SABOR
303
304    # Send json encoded data to server
305    jsonobj=json.dumps(json_tmp);
306    jsonobj=json.dumps({'uid':uid,'vers':version,'sab':sabor, 'stats': jsonobj });
307    payload = {'stats':jsonobj};
308    try:
309        debug("SENDING DATA:"+str(payload))
310    except Exception as e:
311        debug("Error debugging data to send")
312    #debug('proxy set to '+proxy)
313    p={}
314    p['http']=proxy
315    #debug('p='+str(p))
316    try:
317        if proxy != '':
318            r = requests.post(url,data=payload,headers=headers,proxies=p,timeout=5)
319        else:
320            r = requests.post(url,data=payload,headers=headers,timeout=5)
321        debug("S("+r.text+')',2);
322    except Exception as e:
323        debug("Error sending request: "+str(e))
324#END def send_data():
325
326#
327# Show list stored into db file
328#
329def show_list_db():
330    global filename,daemon_mode,DEBUG_FILE
331    db = sqlite3.connect(filename)
332    cur= db.cursor()
333    if daemon_mode != True:
334        print "TOP 10:"
335        print "~~~~~~~~~~"
336        try:
337                cur.execute('select * from info order by n DESC limit 10;')
338                for x in cur.fetchall():
339                        print x[0] + " " + str(x[1]);
340        except:
341                print "No data available"
342    else:
343        fp_debug=open(DEBUG_FILE,"a")
344        fp_debug.write("TOP 10:\n~~~~~~~~~~\n")
345        try:
346                cur.execute('select * from info order by n DESC limit 10;')
347                for x in cur.fetchall():
348                        fp_debug.write(x[0] + " " + str(x[1])+"\n")
349        except:
350                fp_debug.write("No data available\n")
351        fp_debug.close()
352    db.close()
353#END def show_list_db():
354
355#
356# Calculates new name to store temporary database
357#
358def get_filename(LOGPATH=''):
359        global filename
360        l=[]
361        if (LOGPATH != ''):
362            LOGPATH=LOGPATH+'/'
363        #Load all alphabet into list
364        for k in list(string.ascii_lowercase):
365                l.append(k)
366
367        #Load all alphabet into list
368        for k in list(string.ascii_lowercase):
369
370                # Load all alphabet (with two characters) into list
371                for p in list(string.ascii_lowercase):
372                        l.append(k+p)
373        # l = {a,b,c...y,z,aa,ab...zy,zz)
374        # Get current date
375        today=date.today()
376
377        current=LOGPATH+'stats-'+str(today)+'.db'
378        ant=''
379        # Get the next index for apply to file
380        for x in l:
381                if not os.path.isfile(current):
382                        filename=LOGPATH+'stats-'+str(today)+ant+'.db'
383                        return LOGPATH+'stats-'+str(today)+x+'.db'
384                        break;
385                else:
386                        ant=x;
387                current=LOGPATH+'stats-'+str(today)+ant+'.db'
388#END def get_filename():
389
390#
391# Clear file with stored database
392#
393def clear_db():
394        global filename
395        debug('C',2)
396        db = sqlite3.connect(filename)
397        cur = db.cursor()
398        cur.execute('delete from info;')
399        db.commit()
400        db.close()
401#END def clear_db():
402
403#
404# Update temporary database and updates global LIST
405#
406def update_db():
407        debug('U',1)
408        global LIST,filename
409        #debug("updating "+filename);
410        # Temporary list
411        L2={}
412        # Open file to store temporary exec ocurrences
413        db = sqlite3.connect(filename)
414        cur= db.cursor()
415        try:
416                cur.execute('select * from info;')
417        except:
418                cur.execute('create table info (cmd varchar(100) primary key,n int(10));');
419                cur.execute('select * from info;')
420        # Load into L2 all database records
421        for x in cur.fetchall():
422                L2[x[0]]=x[1]
423        # Append to LIST all previously added database records
424        for x in L2:
425                if not LIST.has_key(x):
426                        LIST[x]=L2[x]
427        # Clear database records
428        cur.execute('delete from info;')
429        values=[]
430        # Write all records from LIST into file
431        for x in LIST:
432                values.append((x,str(LIST[x])))
433        cur.executemany('insert into info values (?,?)',values)
434        db.commit()
435        db.close()
436#END def update_db():
437
438def daemonize():
439        debug('Running daemon...')
440        try:
441            with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analytics')):
442                start()
443        except Exception as e:
444            debug("Error daemonizing "+str(e))
445#END def daemonize():
446
447def start():
448        fp = open('/var/run/analytics.pid','w');
449        fp.write(str(os.getpid()))
450        fp.close()
451        # Add handlers for functions, close, show, change file / update db
452        signal.signal(signal.SIGINT,sig_handler)
453        signal.signal(signal.SIGUSR1,sig_handler_change)
454        signal.signal(signal.SIGUSR2,sig_handler_show)
455        # Sync data
456        update_db()
457
458        t=threading.Timer(1.0,runner)
459        # De-comment to allow stop threads inmediately
460        #t.daemon = True
461        t.start()
462
463        p1= subprocess.Popen(["tail","-F",logfilename],stdout=subprocess.PIPE)
464
465
466        while True:
467                line = p1.stdout.readline()
468                if(re.search('type=EXECVE',line)):
469                        m=re.findall('a[0-9]+="([^"]+)"',line)
470                        t=""
471                        for i in m:
472                                t+=i + " "
473                        add_item(t)
474#END def start():
475               
476###################
477###### MAIN #######
478###################
479
480# enable / disable show more running data
481DEBUG=0
482# Global list in mem to store results
483LIST={}
484TIMEOUT=300
485BDTIMEOUT=300
486# Counters for threads going to timeout
487ctimeout=0
488btimeout=0
489RUNNER=1
490
491# Get configuration parameters
492config = ConfigParser.ConfigParser()
493config.read('/etc/lliurex-analytics/agent.cfg')
494
495LOGPATH = config.get('Agent','logpath');
496DEBUG_FILE=LOGPATH+"/analytics-debug.log"
497# Filename for temporally files in disk (sqlite)
498filename=''  #initializes as global when is called get_filename
499next_filename=get_filename(LOGPATH)
500
501server = config.get('Server','server');
502server_path = config.get('Server','server-path');
503
504# Max timeout commiting changes to file
505TIMEOUT = config.getint('Agent','timeout')
506
507# Max timeout commiting changes from file to database
508BDTIMEOUT = config.getint('Agent','bdtimeout')
509
510agent = config.get('Agent','user-agent')
511headers = {'user-agent': agent};
512url="http://"+server+"/"+server_path;
513
514daemon_mode = config.getboolean('Agent','daemon')
515logfilename = config.get('Audit','file')
516
517blacklistname = config.get('Audit','blacklist')
518
519BLACKLIST = [line.rstrip('\n') for line in open(blacklistname)]
520INTERPRETERS = config.get('Audit','interpreters').split(',')
521
522debug("filename="+filename+' next='+next_filename)
523
524p=subprocess.Popen(['lliurex-version','-n'],stdout=subprocess.PIPE)
525RELEASE,err=p.communicate()
526RELEASE=RELEASE.strip()
527if RELEASE[0:2] == '15':
528    VERSION_GETTER='lliurex-detect'
529else:
530    VERSION_GETTER='lliurex-version'
531
532
533p=subprocess.Popen([VERSION_GETTER,'-f'],stdout=subprocess.PIPE)
534SABOR,err=p.communicate()
535SABOR=SABOR.strip()
536
537debug('Starting on '+SABOR+' '+RELEASE)
538try:
539     px= subprocess.Popen(["bash","-c","source /etc/profile && echo $http_proxy"],stdout=subprocess.PIPE)
540     proxy=px.stdout.readline().strip()
541     if str(proxy) != '':
542        debug('Detected proxy: '+str(proxy))
543except Exception as e:
544     debug('Error getting proxy '+str(e))
545
546if __name__ == '__main__':
547        if (logfilename == 'stdin'):
548                if (len(sys.argv) < 2):
549                        sys.stderr.write('Error\n')
550                        sys.exit(1)
551                else:       
552                        logfilename=sys.argv[1]
553        else:
554                if daemon_mode == True:
555                        daemonize()
556                else:
557                        start()
558else:
559        if (logfilename == 'stdin'):
560                sys.stderr.write('file=stdin not valid running as module\n')
561                sys.exit(1)
562
Note: See TracBrowser for help on using the repository browser.