source: lliurex-statistics/trunk/fuentes/lliurex-statistics.install/usr/sbin/analytics @ 2962

Last change on this file since 2962 was 2962, checked in by mabarracus, 3 years ago

Add debug verbosity
Fixes confirmator logic
Fixes acl's

  • Property svn:executable set to *
File size: 14.3 KB
RevLine 
[2773]1#!/usr/bin/python
2
3import re
4import os
5import sys
6import signal
7import subprocess
8import threading
9import shlex
10import sqlite3
11from datetime import date
12import string
13import requests
14import json
15import ConfigParser
16import daemon
17import lockfile
[2948]18import xmlrpclib
[2773]19
20######### END CONFIG ###########
21
22
23#
24# Shows debug data
25#
26def debug(text='',type=1):
27    global DEBUG, daemon_mode, DEBUG_FILE
28    fp_debug=open(DEBUG_FILE,"a")
29    if DEBUG:
30        if type == 1:
31            if daemon_mode != True:
32                print str(text)
33            else:
34                fp_debug.write(str(text)+"\n")
35        else:
36            if daemon_mode != True:
37                sys.stdout.write(text)
38            else:
39                fp_debug.write(str(text))
40        sys.stdout.flush()
41    fp_debug.close()
42#END def debug(text='',type=1):
43
44#
45# Handler to manage interrupt properly, sync data before die when SIGINT
46#
47def sig_handler(sig,frame):
48    global t,LIST
49    debug("Aborting...")
50    try:
51        # Cancel threads
52        t.cancel()
53        # Sync data
54        update_db()
55        # Show stored data to stdout
56        show_list_db()
57    except:
58        sys.exit(1)
59    sys.exit(0)
60#END def sig_handler(sig,frame):
61
62#
63# Forces change file when SIGUSR1
64#
65def sig_handler_change(sig, frame):
66        change_file();
67
68#
69# Forces change file and update server database when SIGUSR1
70#
71def change_file():
72        global LIST,filename,next_filename,LOGPATH
73        # sync in-memory and temporary file data
74        update_db()
75        # clear in-memory list
76        LIST.clear()
77        # Check if permission flag is granted
[2948]78        if allow_send():
79            # Send data to master server
80            send_data()
[2962]81        else:
82            debug("Sending not allowed when try to change file")
[2773]83        # Get new temporary filename
84        next_filename=get_filename(LOGPATH)
85        # Sync data in-memory and temporary file data
86        update_db()
87        # Show curent data to stdout
88        show_list()
89#END def change_file():
90
[2948]91def check_server_acknowledge():
92        c = xmlrpclib.ServerProxy("https://server:9779/")
93        return c.get_variable("","VariablesManager","STATS_ENABLED")
94#END def check_server_acknowledge()
95
96def check_local_acknowledge():
97        answer = None
98        if os.path.isfile('/etc/lliurex-analytics/status'):
99            fp = open('/etc/lliurex-analytics/status','r')
100            answer = fp.readline();
101            fp.close()
102        return answer.strip()
103
104#END check_local_acknowledge
105
106def allow_send():
107    answer = check_server_acknowledge()
108    if answer != '1':
109        answer = check_local_acknowledge()
110        try:
111            if answer == "yes":
[2962]112                debug("allow_send stats by check local acknowledge")
[2948]113                return True
114            else:
[2962]115                debug("deny allow_send stats by check server & local acknowledge")
[2948]116                return False
117        except Exception as e:
[2962]118            debug("deny allow_send stats by exception check server & local acknowledge")
[2948]119            return False
120    else:
[2962]121        debug("allow_send stats by check server acknowledge")
[2948]122        return True
123#END def allow_send()
124
[2773]125#
126# Sends data to server and clear all
127#
128def clean_and_send():
129        global LIST
130        # sync in-memory and temporary file data
131        update_db()
132        # clear in-memory list
133        LIST.clear()
[2948]134        if allow_send(): 
135            # Send data to master server
136            send_data()
[2962]137        else
138            debug("Sending not allowed when try send file")
[2773]139        # Clear database file
140        clear_db()
141        # Sync data in-memory and temporary file data
142        update_db()
143#END def clean_and_send():
144
145#
146# Call show data when SIGINT2
147#
148def sig_handler_show(sig,frame):
149        debug("current filename "+filename)
150        show_list()
151        show_list_db()
152        global TIMEOUT
153        global BDTIMEOUT
154        global ctimeout
155        global btimeout
156        debug('TIMEOUT='+str(TIMEOUT)+',BDTIMEOUT='+str(BDTIMEOUT)+',ctimeout='+str(ctimeout)+',btimeout='+str(btimeout))
157#END def sig_handler_show(sig,frame):
158
159#
160# Global function called every second to run functions guided by timeout
161#
162def runner():
163        global t
164        global ctimeout
165        global btimeout
166        global TIMEOUT
167        global BDTIMEOUT
168
169        # Flush in-memory data to file db when expires timeout
170        if ctimeout > TIMEOUT:
171                update_db()
172                ctimeout=0
173        else:
174                ctimeout+=1
175
176        # Call send-data to server when expires dbtimeout
177        if btimeout > BDTIMEOUT:
178                clean_and_send();
179                btimeout=0
180        else:
181                btimeout+=1
182        debug('.',2)
183
184        # Reload 1 sec timeout
185        t=threading.Timer(1.0,runner)
186#        De-comment to allow stop threads inmediately
187#        t.daemon = True
188        t.start()
189#END def runner():
190
191#
192# Add item to in-memory LIST
193#
194def add_item(item):
195        global LIST,BLACKLIST,INTERPRETERS
196
197        # Split binary and parameters
198        #part=shlex.split(item)       
199        part=item.split(' ')
200        # Check binary name is correct
201        if ('/' in part[0]):
202                part[0]=part[0].split('/')[-1]         
203        # Check if binary name must be recorded
204        if (part[0] in BLACKLIST):
205                return
206        # Check if is a known interpreter
207        if (part[0] in INTERPRETERS):
208                part[0]=part[1]
209                # Check script name is correct
210                if ('/' in part[0]):
211                        part[0]=part[0].split('/')[-1]
212                # Check if it's a parameter
213                i=0
214                novalid=['-',':']
215                while (part[i][0] in novalid and i < len(part)):
216                        i+=1
217                if (i < len(part)):
218                        part[0]=part[i]
219                else:
220                        return
221        if ('/' in part[0]):
222                        part[0]=part[0].split('/')[-1]
223
224        # Check if binary name must be recorded
225        if (part[0] in BLACKLIST):
226                return
227
228        # Add or append to global list
229        if LIST.has_key(part[0]):
230                debug('+',2)
231                LIST[part[0]]+=1
232        else:
233                debug('*',2)
234                LIST[part[0]]=1
235#END def add_item(item):
236
237#
238# Show in-memory LIST and print to stdout
239#
240def show_list():
241        global LIST,daemon_mode,DEBUG_FILE
242        if daemon_mode != True:
243            print "Lista:"
244            print "~~~~~~~~~~"
245            for it in LIST:
246                print it + "->" + str(LIST[it])
247        else:
248            fp_debug=open(DEBUG_FILE,"a")
249            fp_debug.write("Lista:\n~~~~~~~\n");
250            for it in LIST:
251                fp_debug.write(it + "->" + str(LIST[it]) + "\n")
252            fp_debug.close()
253#END def show_list():
254
255#
256# Send data from temporary db to server using http post method
257#
258def send_data():
259    # Calculates mac address
260    f = open('/sys/class/net/eth0/address','r');
261    uid=f.read().strip();
262    f.close();
263
264    global filename;
265   
266    db = sqlite3.connect(filename);
267    cur= db.cursor();
268    try:
269        cur.execute('select * from info order by n DESC limit 100;');
270        json_tmp={};
271        # Dumps temporary database building json object to send
272        for x in cur.fetchall():
273            json_tmp[x[0]]=str(x[1]);
274    except:
275        return;
276    # Get version and flavour
277    p=subprocess.Popen(['lliurex-version','-n'],stdout=subprocess.PIPE)
278    out,err=p.communicate()
279    version=out.strip();
280
281    p=subprocess.Popen(['lliurex-version','-v'],stdout=subprocess.PIPE)
282    out,err=p.communicate()
283    sabor=out.strip();
284
285    # Send json encoded data to server
286    jsonobj=json.dumps(json_tmp);
287    jsonobj=json.dumps({'uid':uid,'vers':version,'sab':sabor, 'stats': jsonobj });
288    payload = {'stats':jsonobj};
[2962]289    try:
290        debug("SENDING DATA:"+str(payload))
291    except Exception as e:
292        debug("Error debugging data to send")
293
[2773]294    r = requests.post(url,data=payload,headers=headers);
295    debug("S("+r.text+')',2);
296#END def send_data():
297
298#
299# Show list stored into db file
300#
301def show_list_db():
302    global filename,daemon_mode,DEBUG_FILE
303    db = sqlite3.connect(filename)
304    cur= db.cursor()
305    if daemon_mode != True:
306        print "TOP 10:"
307        print "~~~~~~~~~~"
308        try:
309                cur.execute('select * from info order by n DESC limit 10;')
310                for x in cur.fetchall():
311                        print x[0] + " " + str(x[1]);
312        except:
313                print "No data available"
314    else:
315        fp_debug=open(DEBUG_FILE,"a")
316        fp_debug.write("TOP 10:\n~~~~~~~~~~\n")
317        try:
318                cur.execute('select * from info order by n DESC limit 10;')
319                for x in cur.fetchall():
320                        fp_debug.write(x[0] + " " + str(x[1])+"\n")
321        except:
322                fp_debug.write("No data available\n")
323        fp_debug.close()
324    db.close()
325#END def show_list_db():
326
327#
328# Calculates new name to store temporary database
329#
330def get_filename(LOGPATH=''):
331        global filename
332        l=[]
333        if (LOGPATH != ''):
334            LOGPATH=LOGPATH+'/'
335        #Load all alphabet into list
336        for k in list(string.ascii_lowercase):
337                l.append(k)
338
339        #Load all alphabet into list
340        for k in list(string.ascii_lowercase):
341
342                # Load all alphabet (with two characters) into list
343                for p in list(string.ascii_lowercase):
344                        l.append(k+p)
345        # l = {a,b,c...y,z,aa,ab...zy,zz)
346        # Get current date
347        today=date.today()
348
349        current=LOGPATH+'stats-'+str(today)+'.db'
350        ant=''
351        # Get the next index for apply to file
352        for x in l:
353                if not os.path.isfile(current):
354                        filename=LOGPATH+'stats-'+str(today)+ant+'.db'
355                        return LOGPATH+'stats-'+str(today)+x+'.db'
356                        break;
357                else:
358                        ant=x;
359                current=LOGPATH+'stats-'+str(today)+ant+'.db'
360#END def get_filename():
361
362#
363# Clear file with stored database
364#
365def clear_db():
366        global filename
367        debug('C',2)
368        db = sqlite3.connect(filename)
369        cur = db.cursor()
370        cur.execute('delete from info;')
371        db.commit()
372        db.close()
373#END def clear_db():
374
375#
376# Update temporary database and updates global LIST
377#
378def update_db():
379        debug('U',2)
380        global LIST,filename
381        #debug("updating "+filename);
382        # Temporary list
383        L2={}
384        # Open file to store temporary exec ocurrences
385        db = sqlite3.connect(filename)
386        cur= db.cursor()
387        try:
388                cur.execute('select * from info;')
389        except:
390                cur.execute('create table info (cmd varchar(100) primary key,n int(10));');
391                cur.execute('select * from info;')
392        # Load into L2 all database records
393        for x in cur.fetchall():
394                L2[x[0]]=x[1]
395        # Append to LIST all previously added database records
396        for x in L2:
397                if not LIST.has_key(x):
398                        LIST[x]=L2[x]
399        # Clear database records
400        cur.execute('delete from info;')
401        values=[]
402        # Write all records from LIST into file
403        for x in LIST:
404                values.append((x,str(LIST[x])))
405        cur.executemany('insert into info values (?,?)',values)
406        db.commit()
407        db.close()
408#END def update_db():
409
410def daemonize():
411        debug('Running daemon...')
412        with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analytics')):
413                start()
414#END def daemonize():
415
416def start():
417        fp = open('/var/run/analytics.pid','w');
418        fp.write(str(os.getpid()))
419        fp.close()
420        # Add handlers for functions, close, show, change file / update db
421        signal.signal(signal.SIGINT,sig_handler)
422        signal.signal(signal.SIGUSR1,sig_handler_change)
423        signal.signal(signal.SIGUSR2,sig_handler_show)
424        # Sync data
425        update_db()
426
427        t=threading.Timer(1.0,runner)
428        # De-comment to allow stop threads inmediately
429        #t.daemon = True
430        t.start()
431
432        p1= subprocess.Popen(["tail","-F",logfilename],stdout=subprocess.PIPE)
433
434
435        while True:
436                line = p1.stdout.readline()
437                if(re.search('type=EXECVE',line)):
438                        m=re.findall('a[0-9]+="([^"]+)"',line)
439                        t=""
440                        for i in m:
441                                t+=i + " "
442                        add_item(t)
443#END def start():
444               
445###################
446###### MAIN #######
447###################
448
449# enable / disable show more running data
450DEBUG=0
451# Global list in mem to store results
452LIST={}
453TIMEOUT=300
454BDTIMEOUT=300
455# Counters for threads going to timeout
456ctimeout=0
457btimeout=0
458
459# Get configuration parameters
460config = ConfigParser.ConfigParser()
461config.read('/etc/lliurex-analytics/agent.cfg')
462
463LOGPATH = config.get('Agent','logpath');
464DEBUG_FILE=LOGPATH+"/analytics-debug.log"
465# Filename for temporally files in disk (sqlite)
466filename=''  #initializes as global when is called get_filename
467next_filename=get_filename(LOGPATH)
468debug("filename="+filename+' next='+next_filename)
469
470server = config.get('Server','server');
471server_path = config.get('Server','server-path');
472
473# Max timeout commiting changes to file
474TIMEOUT = config.getint('Agent','timeout')
475
476# Max timeout commiting changes from file to database
477BDTIMEOUT = config.getint('Agent','bdtimeout')
478
479agent = config.get('Agent','user-agent')
480headers = {'user-agent': agent};
481url="http://"+server+"/"+server_path;
482
483daemon_mode = config.getboolean('Agent','daemon')
484logfilename = config.get('Audit','file')
485
486blacklistname = config.get('Audit','blacklist')
487
488BLACKLIST = [line.rstrip('\n') for line in open(blacklistname)]
489INTERPRETERS = config.get('Audit','interpreters').split(',')
490
491
492
493if __name__ == '__main__':
494        if (logfilename == 'stdin'):
495                if (len(sys.argv) < 2):
496                        sys.stderr.write('Error\n')
497                        sys.exit(1)
498                else:       
499                        logfilename=sys.argv[1]
500        else:
501                if daemon_mode == True:
502                        daemonize()
503                else:
504                        start()
505else:
506        if (logfilename == 'stdin'):
507                sys.stderr.write('file=stdin not valid running as module\n')
508                sys.exit(1)
509
Note: See TracBrowser for help on using the repository browser.