source: lliurex-statistics/trunk/fuentes/lliurex-statistics.install/usr/sbin/analytics @ 6776

Last change on this file since 6776 was 6776, checked in by mabarracus, 20 months ago

increase sending apps

  • Property svn:executable set to *
File size: 15.8 KB
Line 
1#!/usr/bin/python
2
3import re
4import os
5import sys
6import signal
7import subprocess
8import threading
9import shlex
10import sqlite3
11from datetime import date
12import string
13import requests
14import json
15import ConfigParser
16import daemon
17import lockfile
18import xmlrpclib
19import time
20
21######### END CONFIG ###########
22
23
24#
25# Shows debug data
26#
27def debug(text='',type=1):
28    global DEBUG, daemon_mode, DEBUG_FILE
29    fp_debug=open(DEBUG_FILE,"a")
30    if DEBUG:
31        if type == 1:
32            if daemon_mode != True:
33                print str(text)
34            else:
35                fp_debug.write(str(text)+"\n")
36        else:
37            if daemon_mode != True:
38                sys.stdout.write(text)
39            else:
40                fp_debug.write(str(text))
41        sys.stdout.flush()
42    fp_debug.close()
43#END def debug(text='',type=1):
44
45#
46# Handler to manage interrupt properly, sync data before die when SIGINT
47#
48def sig_handler(sig,frame):
49    global t,LIST,RUNNER
50    debug("Aborting...")
51    RUNNER=0
52    try:
53        # Cancel threads
54        t.cancel()
55        # Sync data
56        update_db()
57        # Show stored data to stdout
58        show_list_db()
59    except:
60        sys.exit(1)
61    sys.exit(0)
62#END def sig_handler(sig,frame):
63
64#
65# Forces change file when SIGUSR1
66#
67def sig_handler_change(sig, frame):
68        change_file();
69
70#
71# Forces change file and update server database when SIGUSR1
72#
73def change_file():
74        global LIST,filename,next_filename,LOGPATH
75        # sync in-memory and temporary file data
76        update_db()
77        # clear in-memory list
78        LIST.clear()
79        # Check if permission flag is granted
80        if allow_send():
81            # Send data to master server
82            send_data()
83        else:
84            debug("Sending not allowed when try to change file")
85        # Get new temporary filename
86        next_filename=get_filename(LOGPATH)
87        # Sync data in-memory and temporary file data
88        update_db()
89        # Show curent data to stdout
90        show_list()
91#END def change_file():
92
93def check_server_acknowledge():
94        try:
95            c = xmlrpclib.ServerProxy("https://server:9779/")
96            return c.get_variable("","VariablesManager","STATS_ENABLED")
97        except:
98            return 'None'
99#END def check_server_acknowledge()
100
101def check_local_acknowledge():
102        answer = None
103        if os.path.isfile('/etc/lliurex-analytics/status'):
104            fp = open('/etc/lliurex-analytics/status','r')
105            answer = fp.readline();
106            fp.close()
107        return answer.strip()
108
109#END check_local_acknowledge
110
111def allow_send():
112    global SABOR
113
114    if SABOR.lower() != 'server':
115        answer = check_server_acknowledge()
116        if answer == '1':
117            debug("allow_send stats by check server acknowledge")
118            return True
119        if answer == '0':
120            debug("deny allow_send stats by check server acknowledge")
121            return False
122
123    answer = check_local_acknowledge()
124    if answer == "yes":
125        debug("allow_send stats by check local acknowledge")
126        return True
127    if answer == 'no':
128        debug("deny allow_send by check local acknowledge")
129        return False
130
131    debug("deny allow_send stats by exception check server & local acknowledge")
132    return False
133
134#END def allow_send()
135
136#
137# Sends data to server and clear all
138#
139def clean_and_send():
140        global LIST
141        # sync in-memory and temporary file data
142        update_db()
143        # clear in-memory list
144        LIST.clear()
145        if allow_send(): 
146            # Send data to master server
147            send_data()
148        else:
149            debug("Sending not allowed when try send file")
150        # Clear database file
151        clear_db()
152        # Sync data in-memory and temporary file data
153        update_db()
154#END def clean_and_send():
155
156#
157# Call show data when SIGINT2
158#
159def sig_handler_show(sig,frame):
160        debug("current filename "+filename)
161        show_list()
162        show_list_db()
163        global TIMEOUT
164        global BDTIMEOUT
165        global ctimeout
166        global btimeout
167        debug('TIMEOUT='+str(TIMEOUT)+',BDTIMEOUT='+str(BDTIMEOUT)+',ctimeout='+str(ctimeout)+',btimeout='+str(btimeout))
168#END def sig_handler_show(sig,frame):
169
170#
171# Global function called every second to run functions guided by timeout
172#
173def runner():
174        global t
175        global ctimeout
176        global btimeout
177        global TIMEOUT
178        global BDTIMEOUT
179        global RUNNER
180       
181        while RUNNER == 1:
182            # Flush in-memory data to file db when expires timeout
183            if ctimeout > TIMEOUT:
184                update_db()
185                ctimeout=0
186            else:
187                ctimeout+=1
188
189            # Call send-data to server when expires dbtimeout
190            if btimeout > BDTIMEOUT:
191                clean_and_send();
192                btimeout=0
193            else:
194                btimeout+=1
195            debug('.',2)
196            time.sleep(1.0)
197            # Reload 1 sec timeout
198            #t=threading.Timer(1.0,runner)
199#           De-comment to allow stop threads inmediately
200#           t.daemon = True
201            #t.start()
202#END def runner():
203
204#
205# Add item to in-memory LIST
206#
207def add_item(item):
208        global LIST,BLACKLIST,INTERPRETERS
209
210        # Split binary and parameters
211        #part=shlex.split(item)       
212        part=item.split(' ')
213        # Check binary name is correct
214        if ('/' in part[0]):
215                part[0]=part[0].split('/')[-1]         
216        # Check if binary name must be recorded
217        if (part[0] in BLACKLIST):
218                return
219        # Check if is a known interpreter
220        if (part[0] in INTERPRETERS):
221                part[0]=part[1]
222                # Check script name is correct
223                if ('/' in part[0]):
224                        part[0]=part[0].split('/')[-1]
225                # Check if it's a parameter
226                i=0
227                novalid=['-',':']
228                while (part[i][0] in novalid and i < len(part)):
229                        i+=1
230                if (i < len(part)):
231                        part[0]=part[i]
232                else:
233                        return
234        if ('/' in part[0]):
235                        part[0]=part[0].split('/')[-1]
236
237        # Check if binary name must be recorded
238        if (part[0] in BLACKLIST):
239                return
240
241        # Add or append to global list
242        if LIST.has_key(part[0]):
243                debug('+',2)
244                LIST[part[0]]+=1
245        else:
246                debug('*',2)
247                LIST[part[0]]=1
248#END def add_item(item):
249
250#
251# Show in-memory LIST and print to stdout
252#
253def show_list():
254        global LIST,daemon_mode,DEBUG_FILE
255        if daemon_mode != True:
256            print "Lista:"
257            print "~~~~~~~~~~"
258            for it in LIST:
259                print it + "->" + str(LIST[it])
260        else:
261            fp_debug=open(DEBUG_FILE,"a")
262            fp_debug.write("Lista:\n~~~~~~~\n");
263            for it in LIST:
264                fp_debug.write(it + "->" + str(LIST[it]) + "\n")
265            fp_debug.close()
266#END def show_list():
267
268#
269# Send data from temporary db to server using http post method
270#
271def send_data():
272    global SABOR
273    global RELEASE
274
275    # Calculates mac address
276    f = open('/sys/class/net/eth0/address','r');
277    uid=f.read().strip();
278    f.close();
279
280    global filename,url,proxy,headers,SEND_ITEMS;
281   
282    db = sqlite3.connect(filename);
283    cur= db.cursor();
284    try:
285        cur.execute('select * from info order by n DESC limit {};'.format(SEND_ITEMS));
286        json_tmp={};
287        # Dumps temporary database building json object to send
288        for x in cur.fetchall():
289            json_tmp[x[0]]=str(x[1]);
290    except:
291        return;
292   
293    # Get version and flavour
294    #p=subprocess.Popen(['lliurex-version','-n'],stdout=subprocess.PIPE)
295    #out,err=p.communicate()
296    #version=out.strip();
297    version=RELEASE
298
299    #p=subprocess.Popen(['lliurex-version','-f'],stdout=subprocess.PIPE)
300    #out,err=p.communicate()
301    #sabor=out.strip();
302    sabor=SABOR
303
304    # Send json encoded data to server
305    jsonobj=json.dumps(json_tmp);
306    jsonobj=json.dumps({'uid':uid,'vers':version,'sab':sabor, 'stats': jsonobj });
307    payload = {'stats':jsonobj};
308    try:
309        debug("SENDING DATA:"+str(payload))
310    except Exception as e:
311        debug("Error debugging data to send")
312    #debug('proxy set to '+proxy)
313    p={}
314    p['http']=proxy
315    #debug('p='+str(p))
316    try:
317        if proxy != '':
318            r = requests.post(url,data=payload,headers=headers,proxies=p,timeout=5)
319        else:
320            r = requests.post(url,data=payload,headers=headers,timeout=5)
321        debug("S("+r.text+')',2);
322    except Exception as e:
323        debug("Error sending request: "+str(e))
324#END def send_data():
325
326#
327# Show list stored into db file
328#
329def show_list_db():
330    global filename,daemon_mode,DEBUG_FILE,SEND_ITEMS
331    db = sqlite3.connect(filename)
332    cur= db.cursor()
333    if daemon_mode != True:
334        print "TOP {}:".format(SEND_ITEMS)
335        print "~~~~~~~~~~"
336        try:
337                cur.execute('select * from info order by n DESC limit {};'.format(SEND_ITEMS))
338                for x in cur.fetchall():
339                        print x[0] + " " + str(x[1]);
340        except:
341                print "No data available"
342    else:
343        fp_debug=open(DEBUG_FILE,"a")
344        fp_debug.write("TOP 10:\n~~~~~~~~~~\n")
345        try:
346                cur.execute('select * from info order by n DESC limit {};'.format(SEND_ITEMS))
347                for x in cur.fetchall():
348                        fp_debug.write(x[0] + " " + str(x[1])+"\n")
349        except:
350                fp_debug.write("No data available\n")
351        fp_debug.close()
352    db.close()
353#END def show_list_db():
354
355#
356# Calculates new name to store temporary database
357#
358def get_filename(LOGPATH=''):
359        global filename
360        l=[]
361        if (LOGPATH != ''):
362            LOGPATH=LOGPATH+'/'
363        #Load all alphabet into list
364        for k in list(string.ascii_lowercase):
365                l.append(k)
366
367        #Load all alphabet into list
368        for k in list(string.ascii_lowercase):
369
370                # Load all alphabet (with two characters) into list
371                for p in list(string.ascii_lowercase):
372                        l.append(k+p)
373        # l = {a,b,c...y,z,aa,ab...zy,zz)
374        # Get current date
375        today=date.today()
376
377        current=LOGPATH+'stats-'+str(today)+'.db'
378        ant=''
379        # Get the next index for apply to file
380        for x in l:
381                if not os.path.isfile(current):
382                        filename=LOGPATH+'stats-'+str(today)+ant+'.db'
383                        return LOGPATH+'stats-'+str(today)+x+'.db'
384                        break;
385                else:
386                        ant=x;
387                current=LOGPATH+'stats-'+str(today)+ant+'.db'
388#END def get_filename():
389
390#
391# Clear file with stored database
392#
393def clear_db():
394        global filename
395        debug('C',2)
396        db = sqlite3.connect(filename)
397        cur = db.cursor()
398        cur.execute('delete from info;')
399        db.commit()
400        db.close()
401#END def clear_db():
402
403#
404# Update temporary database and updates global LIST
405#
406def update_db():
407        debug('U',1)
408        global LIST,filename
409        #debug("updating "+filename);
410        # Temporary list
411        L2={}
412        # Open file to store temporary exec ocurrences
413        db = sqlite3.connect(filename)
414        cur= db.cursor()
415        try:
416                cur.execute('select * from info;')
417        except:
418                cur.execute('create table info (cmd varchar(100) primary key,n int(10));');
419                cur.execute('select * from info;')
420        # Load into L2 all database records
421        for x in cur.fetchall():
422                L2[x[0]]=x[1]
423        # Append to LIST all previously added database records
424        for x in L2:
425                if not LIST.has_key(x):
426                        LIST[x]=L2[x]
427        # Clear database records
428        cur.execute('delete from info;')
429        values=[]
430        # Write all records from LIST into file
431        for x in LIST:
432                values.append((x,str(LIST[x])))
433        cur.executemany('insert into info values (?,?)',values)
434        db.commit()
435        db.close()
436#END def update_db():
437
438def daemonize():
439        debug('Running daemon...')
440        try:
441            with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analytics')):
442                start()
443        except Exception as e:
444            debug("Error daemonizing "+str(e))
445#END def daemonize():
446
447def start():
448        fp = open('/var/run/analytics.pid','w');
449        fp.write(str(os.getpid()))
450        fp.close()
451        # Add handlers for functions, close, show, change file / update db
452        signal.signal(signal.SIGINT,sig_handler)
453        signal.signal(signal.SIGUSR1,sig_handler_change)
454        signal.signal(signal.SIGUSR2,sig_handler_show)
455        # Sync data
456        update_db()
457
458        t=threading.Timer(1.0,runner)
459        # De-comment to allow stop threads inmediately
460        #t.daemon = True
461        t.start()
462
463        p1= subprocess.Popen(["tail","-F",logfilename],stdout=subprocess.PIPE)
464
465
466        while True:
467                line = p1.stdout.readline()
468                if(re.search('type=EXECVE',line)):
469                        m=re.findall('a[0-9]+="([^"]+)"',line)
470                        t=""
471                        for i in m:
472                                t+=i + " "
473                        add_item(t)
474#END def start():
475               
476###################
477###### MAIN #######
478###################
479
480# enable / disable show more running data
481DEBUG=0
482# Global list in mem to store results
483LIST={}
484TIMEOUT=300
485BDTIMEOUT=300
486SEND_ITEMS=100
487# Counters for threads going to timeout
488ctimeout=0
489btimeout=0
490RUNNER=1
491
492# Get configuration parameters
493config = ConfigParser.ConfigParser()
494config.read('/etc/lliurex-analytics/agent.cfg')
495
496LOGPATH = config.get('Agent','logpath');
497DEBUG_FILE=LOGPATH+"/analytics-debug.log"
498# Filename for temporally files in disk (sqlite)
499filename=''  #initializes as global when is called get_filename
500next_filename=get_filename(LOGPATH)
501
502server = config.get('Server','server');
503server_path = config.get('Server','server-path');
504
505# Max timeout commiting changes to file
506TIMEOUT = config.getint('Agent','timeout')
507
508# Max timeout commiting changes from file to database
509BDTIMEOUT = config.getint('Agent','bdtimeout')
510
511agent = config.get('Agent','user-agent')
512headers = {'user-agent': agent};
513url="http://"+server+"/"+server_path;
514
515daemon_mode = config.getboolean('Agent','daemon')
516logfilename = config.get('Audit','file')
517
518blacklistname = config.get('Audit','blacklist')
519
520BLACKLIST = [line.rstrip('\n') for line in open(blacklistname)]
521INTERPRETERS = config.get('Audit','interpreters').split(',')
522
523debug("filename="+filename+' next='+next_filename)
524
525p=subprocess.Popen(['lliurex-version','-n'],stdout=subprocess.PIPE)
526RELEASE,err=p.communicate()
527RELEASE=RELEASE.strip()
528if RELEASE[0:2] == '15':
529    VERSION_GETTER='lliurex-detect'
530else:
531    VERSION_GETTER='lliurex-version'
532
533
534p=subprocess.Popen([VERSION_GETTER,'-f'],stdout=subprocess.PIPE)
535SABOR,err=p.communicate()
536SABOR=SABOR.strip()
537
538debug('Starting on '+SABOR+' '+RELEASE)
539try:
540     px= subprocess.Popen(["bash","-c","source /etc/profile && echo $http_proxy"],stdout=subprocess.PIPE)
541     proxy=px.stdout.readline().strip()
542     if str(proxy) != '':
543        debug('Detected proxy: '+str(proxy))
544except Exception as e:
545     debug('Error getting proxy '+str(e))
546
547if __name__ == '__main__':
548        if (logfilename == 'stdin'):
549                if (len(sys.argv) < 2):
550                        sys.stderr.write('Error\n')
551                        sys.exit(1)
552                else:       
553                        logfilename=sys.argv[1]
554        else:
555                if daemon_mode == True:
556                        daemonize()
557                else:
558                        start()
559else:
560        if (logfilename == 'stdin'):
561                sys.stderr.write('file=stdin not valid running as module\n')
562                sys.exit(1)
563
Note: See TracBrowser for help on using the repository browser.