source: lliurex-statistics/trunk/fuentes/lliurex-statistics.install/usr/sbin/analytics @ 3456

Last change on this file since 3456 was 3456, checked in by mabarracus, 3 years ago

Fix priority of server over local acknowledge

  • Property svn:executable set to *
File size: 15.0 KB
Line 
1#!/usr/bin/python
2
3import re
4import os
5import sys
6import signal
7import subprocess
8import threading
9import shlex
10import sqlite3
11from datetime import date
12import string
13import requests
14import json
15import ConfigParser
16import daemon
17import lockfile
18import xmlrpclib
19
20######### END CONFIG ###########
21
22
23#
24# Shows debug data
25#
26def debug(text='',type=1):
27    global DEBUG, daemon_mode, DEBUG_FILE
28    fp_debug=open(DEBUG_FILE,"a")
29    if DEBUG:
30        if type == 1:
31            if daemon_mode != True:
32                print str(text)
33            else:
34                fp_debug.write(str(text)+"\n")
35        else:
36            if daemon_mode != True:
37                sys.stdout.write(text)
38            else:
39                fp_debug.write(str(text))
40        sys.stdout.flush()
41    fp_debug.close()
42#END def debug(text='',type=1):
43
44#
45# Handler to manage interrupt properly, sync data before die when SIGINT
46#
47def sig_handler(sig,frame):
48    global t,LIST
49    debug("Aborting...")
50    try:
51        # Cancel threads
52        t.cancel()
53        # Sync data
54        update_db()
55        # Show stored data to stdout
56        show_list_db()
57    except:
58        sys.exit(1)
59    sys.exit(0)
60#END def sig_handler(sig,frame):
61
62#
63# Forces change file when SIGUSR1
64#
65def sig_handler_change(sig, frame):
66        change_file();
67
68#
69# Forces change file and update server database when SIGUSR1
70#
71def change_file():
72        global LIST,filename,next_filename,LOGPATH
73        # sync in-memory and temporary file data
74        update_db()
75        # clear in-memory list
76        LIST.clear()
77        # Check if permission flag is granted
78        if allow_send():
79            # Send data to master server
80            send_data()
81        else:
82            debug("Sending not allowed when try to change file")
83        # Get new temporary filename
84        next_filename=get_filename(LOGPATH)
85        # Sync data in-memory and temporary file data
86        update_db()
87        # Show curent data to stdout
88        show_list()
89#END def change_file():
90
91def check_server_acknowledge():
92        try:
93            c = xmlrpclib.ServerProxy("https://server:9779/")
94            return c.get_variable("","VariablesManager","STATS_ENABLED")
95        except:
96            return 'None'
97#END def check_server_acknowledge()
98
99def check_local_acknowledge():
100        answer = None
101        if os.path.isfile('/etc/lliurex-analytics/status'):
102            fp = open('/etc/lliurex-analytics/status','r')
103            answer = fp.readline();
104            fp.close()
105        return answer.strip()
106
107#END check_local_acknowledge
108
109def allow_send():
110    answer = check_server_acknowledge()
111    if answer == '1':
112        debug("allow_send stats by check server acknowledge")
113        return True
114    if answer == '0':
115        debug("deny allow_send stats by check server acknowledge")
116        return False
117   
118    answer = check_local_acknowledge()
119    if answer == "yes":
120        debug("allow_send stats by check local acknowledge")
121        return True
122    if answer == 'no':
123        debug("deny allow_send by check local acknowledge")
124        return False
125
126    debug("deny allow_send stats by exception check server & local acknowledge")
127    return False
128
129#END def allow_send()
130
131#
132# Sends data to server and clear all
133#
134def clean_and_send():
135        global LIST
136        # sync in-memory and temporary file data
137        update_db()
138        # clear in-memory list
139        LIST.clear()
140        if allow_send(): 
141            # Send data to master server
142            send_data()
143        else:
144            debug("Sending not allowed when try send file")
145        # Clear database file
146        clear_db()
147        # Sync data in-memory and temporary file data
148        update_db()
149#END def clean_and_send():
150
151#
152# Call show data when SIGINT2
153#
154def sig_handler_show(sig,frame):
155        debug("current filename "+filename)
156        show_list()
157        show_list_db()
158        global TIMEOUT
159        global BDTIMEOUT
160        global ctimeout
161        global btimeout
162        debug('TIMEOUT='+str(TIMEOUT)+',BDTIMEOUT='+str(BDTIMEOUT)+',ctimeout='+str(ctimeout)+',btimeout='+str(btimeout))
163#END def sig_handler_show(sig,frame):
164
165#
166# Global function called every second to run functions guided by timeout
167#
168def runner():
169        global t
170        global ctimeout
171        global btimeout
172        global TIMEOUT
173        global BDTIMEOUT
174
175        # Flush in-memory data to file db when expires timeout
176        if ctimeout > TIMEOUT:
177                update_db()
178                ctimeout=0
179        else:
180                ctimeout+=1
181
182        # Call send-data to server when expires dbtimeout
183        if btimeout > BDTIMEOUT:
184                clean_and_send();
185                btimeout=0
186        else:
187                btimeout+=1
188        debug('.',2)
189
190        # Reload 1 sec timeout
191        t=threading.Timer(1.0,runner)
192#        De-comment to allow stop threads inmediately
193#        t.daemon = True
194        t.start()
195#END def runner():
196
197#
198# Add item to in-memory LIST
199#
200def add_item(item):
201        global LIST,BLACKLIST,INTERPRETERS
202
203        # Split binary and parameters
204        #part=shlex.split(item)       
205        part=item.split(' ')
206        # Check binary name is correct
207        if ('/' in part[0]):
208                part[0]=part[0].split('/')[-1]         
209        # Check if binary name must be recorded
210        if (part[0] in BLACKLIST):
211                return
212        # Check if is a known interpreter
213        if (part[0] in INTERPRETERS):
214                part[0]=part[1]
215                # Check script name is correct
216                if ('/' in part[0]):
217                        part[0]=part[0].split('/')[-1]
218                # Check if it's a parameter
219                i=0
220                novalid=['-',':']
221                while (part[i][0] in novalid and i < len(part)):
222                        i+=1
223                if (i < len(part)):
224                        part[0]=part[i]
225                else:
226                        return
227        if ('/' in part[0]):
228                        part[0]=part[0].split('/')[-1]
229
230        # Check if binary name must be recorded
231        if (part[0] in BLACKLIST):
232                return
233
234        # Add or append to global list
235        if LIST.has_key(part[0]):
236                debug('+',2)
237                LIST[part[0]]+=1
238        else:
239                debug('*',2)
240                LIST[part[0]]=1
241#END def add_item(item):
242
243#
244# Show in-memory LIST and print to stdout
245#
246def show_list():
247        global LIST,daemon_mode,DEBUG_FILE
248        if daemon_mode != True:
249            print "Lista:"
250            print "~~~~~~~~~~"
251            for it in LIST:
252                print it + "->" + str(LIST[it])
253        else:
254            fp_debug=open(DEBUG_FILE,"a")
255            fp_debug.write("Lista:\n~~~~~~~\n");
256            for it in LIST:
257                fp_debug.write(it + "->" + str(LIST[it]) + "\n")
258            fp_debug.close()
259#END def show_list():
260
261#
262# Send data from temporary db to server using http post method
263#
264def send_data():
265    # Calculates mac address
266    f = open('/sys/class/net/eth0/address','r');
267    uid=f.read().strip();
268    f.close();
269
270    global filename,url,proxy,headers;
271   
272    db = sqlite3.connect(filename);
273    cur= db.cursor();
274    try:
275        cur.execute('select * from info order by n DESC limit 100;');
276        json_tmp={};
277        # Dumps temporary database building json object to send
278        for x in cur.fetchall():
279            json_tmp[x[0]]=str(x[1]);
280    except:
281        return;
282    # Get version and flavour
283    p=subprocess.Popen(['lliurex-version','-n'],stdout=subprocess.PIPE)
284    out,err=p.communicate()
285    version=out.strip();
286
287    p=subprocess.Popen(['lliurex-version','-v'],stdout=subprocess.PIPE)
288    out,err=p.communicate()
289    sabor=out.strip();
290
291    # Send json encoded data to server
292    jsonobj=json.dumps(json_tmp);
293    jsonobj=json.dumps({'uid':uid,'vers':version,'sab':sabor, 'stats': jsonobj });
294    payload = {'stats':jsonobj};
295    try:
296        debug("SENDING DATA:"+str(payload))
297    except Exception as e:
298        debug("Error debugging data to send")
299    #debug('proxy set to '+proxy)
300    p={}
301    p['http']=proxy
302    #debug('p='+str(p))
303    try:
304        if proxy != '':
305            r = requests.post(url,data=payload,headers=headers,proxies=p,timeout=5)
306        else:
307            r = requests.post(url,data=payload,headers=headers,timeout=5)
308        debug("S("+r.text+')',2);
309    except Exception as e:
310        debug("Error sending request: "+str(e))
311#END def send_data():
312
313#
314# Show list stored into db file
315#
316def show_list_db():
317    global filename,daemon_mode,DEBUG_FILE
318    db = sqlite3.connect(filename)
319    cur= db.cursor()
320    if daemon_mode != True:
321        print "TOP 10:"
322        print "~~~~~~~~~~"
323        try:
324                cur.execute('select * from info order by n DESC limit 10;')
325                for x in cur.fetchall():
326                        print x[0] + " " + str(x[1]);
327        except:
328                print "No data available"
329    else:
330        fp_debug=open(DEBUG_FILE,"a")
331        fp_debug.write("TOP 10:\n~~~~~~~~~~\n")
332        try:
333                cur.execute('select * from info order by n DESC limit 10;')
334                for x in cur.fetchall():
335                        fp_debug.write(x[0] + " " + str(x[1])+"\n")
336        except:
337                fp_debug.write("No data available\n")
338        fp_debug.close()
339    db.close()
340#END def show_list_db():
341
342#
343# Calculates new name to store temporary database
344#
345def get_filename(LOGPATH=''):
346        global filename
347        l=[]
348        if (LOGPATH != ''):
349            LOGPATH=LOGPATH+'/'
350        #Load all alphabet into list
351        for k in list(string.ascii_lowercase):
352                l.append(k)
353
354        #Load all alphabet into list
355        for k in list(string.ascii_lowercase):
356
357                # Load all alphabet (with two characters) into list
358                for p in list(string.ascii_lowercase):
359                        l.append(k+p)
360        # l = {a,b,c...y,z,aa,ab...zy,zz)
361        # Get current date
362        today=date.today()
363
364        current=LOGPATH+'stats-'+str(today)+'.db'
365        ant=''
366        # Get the next index for apply to file
367        for x in l:
368                if not os.path.isfile(current):
369                        filename=LOGPATH+'stats-'+str(today)+ant+'.db'
370                        return LOGPATH+'stats-'+str(today)+x+'.db'
371                        break;
372                else:
373                        ant=x;
374                current=LOGPATH+'stats-'+str(today)+ant+'.db'
375#END def get_filename():
376
377#
378# Clear file with stored database
379#
380def clear_db():
381        global filename
382        debug('C',2)
383        db = sqlite3.connect(filename)
384        cur = db.cursor()
385        cur.execute('delete from info;')
386        db.commit()
387        db.close()
388#END def clear_db():
389
390#
391# Update temporary database and updates global LIST
392#
393def update_db():
394        debug('U',1)
395        global LIST,filename
396        #debug("updating "+filename);
397        # Temporary list
398        L2={}
399        # Open file to store temporary exec ocurrences
400        db = sqlite3.connect(filename)
401        cur= db.cursor()
402        try:
403                cur.execute('select * from info;')
404        except:
405                cur.execute('create table info (cmd varchar(100) primary key,n int(10));');
406                cur.execute('select * from info;')
407        # Load into L2 all database records
408        for x in cur.fetchall():
409                L2[x[0]]=x[1]
410        # Append to LIST all previously added database records
411        for x in L2:
412                if not LIST.has_key(x):
413                        LIST[x]=L2[x]
414        # Clear database records
415        cur.execute('delete from info;')
416        values=[]
417        # Write all records from LIST into file
418        for x in LIST:
419                values.append((x,str(LIST[x])))
420        cur.executemany('insert into info values (?,?)',values)
421        db.commit()
422        db.close()
423#END def update_db():
424
425def daemonize():
426        debug('Running daemon...')
427        try:
428            with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analytics')):
429                start()
430        except Exception as e:
431            debug("Error daemonizing "+str(e))
432#END def daemonize():
433
434def start():
435        fp = open('/var/run/analytics.pid','w');
436        fp.write(str(os.getpid()))
437        fp.close()
438        # Add handlers for functions, close, show, change file / update db
439        signal.signal(signal.SIGINT,sig_handler)
440        signal.signal(signal.SIGUSR1,sig_handler_change)
441        signal.signal(signal.SIGUSR2,sig_handler_show)
442        # Sync data
443        update_db()
444
445        t=threading.Timer(1.0,runner)
446        # De-comment to allow stop threads inmediately
447        #t.daemon = True
448        t.start()
449
450        p1= subprocess.Popen(["tail","-F",logfilename],stdout=subprocess.PIPE)
451
452
453        while True:
454                line = p1.stdout.readline()
455                if(re.search('type=EXECVE',line)):
456                        m=re.findall('a[0-9]+="([^"]+)"',line)
457                        t=""
458                        for i in m:
459                                t+=i + " "
460                        add_item(t)
461#END def start():
462               
463###################
464###### MAIN #######
465###################
466
467# enable / disable show more running data
468DEBUG=0
469# Global list in mem to store results
470LIST={}
471TIMEOUT=300
472BDTIMEOUT=300
473# Counters for threads going to timeout
474ctimeout=0
475btimeout=0
476
477# Get configuration parameters
478config = ConfigParser.ConfigParser()
479config.read('/etc/lliurex-analytics/agent.cfg')
480
481LOGPATH = config.get('Agent','logpath');
482DEBUG_FILE=LOGPATH+"/analytics-debug.log"
483# Filename for temporally files in disk (sqlite)
484filename=''  #initializes as global when is called get_filename
485next_filename=get_filename(LOGPATH)
486
487server = config.get('Server','server');
488server_path = config.get('Server','server-path');
489
490# Max timeout commiting changes to file
491TIMEOUT = config.getint('Agent','timeout')
492
493# Max timeout commiting changes from file to database
494BDTIMEOUT = config.getint('Agent','bdtimeout')
495
496agent = config.get('Agent','user-agent')
497headers = {'user-agent': agent};
498url="http://"+server+"/"+server_path;
499
500daemon_mode = config.getboolean('Agent','daemon')
501logfilename = config.get('Audit','file')
502
503blacklistname = config.get('Audit','blacklist')
504
505BLACKLIST = [line.rstrip('\n') for line in open(blacklistname)]
506INTERPRETERS = config.get('Audit','interpreters').split(',')
507
508debug("filename="+filename+' next='+next_filename)
509try:
510     px= subprocess.Popen(["bash","-c","source /etc/profile && echo $http_proxy"],stdout=subprocess.PIPE)
511     proxy=px.stdout.readline().strip()
512     debug('Detected proxy: '+str(proxy))
513except Exception as e:
514     debug('Error getting proxy '+str(e))
515
516if __name__ == '__main__':
517        if (logfilename == 'stdin'):
518                if (len(sys.argv) < 2):
519                        sys.stderr.write('Error\n')
520                        sys.exit(1)
521                else:       
522                        logfilename=sys.argv[1]
523        else:
524                if daemon_mode == True:
525                        daemonize()
526                else:
527                        start()
528else:
529        if (logfilename == 'stdin'):
530                sys.stderr.write('file=stdin not valid running as module\n')
531                sys.exit(1)
532
Note: See TracBrowser for help on using the repository browser.