source: lliurex-statistics/trunk/fuentes/lliurex-statistics.install/usr/sbin/analytics @ 2966

Last change on this file since 2966 was 2966, checked in by mabarracus, 3 years ago

Fix debugging mode into collector
Fix n4d checking

  • Property svn:executable set to *
File size: 14.3 KB
Line 
1#!/usr/bin/python
2
3import re
4import os
5import sys
6import signal
7import subprocess
8import threading
9import shlex
10import sqlite3
11from datetime import date
12import string
13import requests
14import json
15import ConfigParser
16import daemon
17import lockfile
18import xmlrpclib
19
20######### END CONFIG ###########
21
22
23#
24# Shows debug data
25#
26def debug(text='',type=1):
27    global DEBUG, daemon_mode, DEBUG_FILE
28    fp_debug=open(DEBUG_FILE,"a")
29    if DEBUG:
30        if type == 1:
31            if daemon_mode != True:
32                print str(text)
33            else:
34                fp_debug.write(str(text)+"\n")
35        else:
36            if daemon_mode != True:
37                sys.stdout.write(text)
38            else:
39                fp_debug.write(str(text))
40        sys.stdout.flush()
41    fp_debug.close()
42#END def debug(text='',type=1):
43
44#
45# Handler to manage interrupt properly, sync data before die when SIGINT
46#
47def sig_handler(sig,frame):
48    global t,LIST
49    debug("Aborting...")
50    try:
51        # Cancel threads
52        t.cancel()
53        # Sync data
54        update_db()
55        # Show stored data to stdout
56        show_list_db()
57    except:
58        sys.exit(1)
59    sys.exit(0)
60#END def sig_handler(sig,frame):
61
62#
63# Forces change file when SIGUSR1
64#
65def sig_handler_change(sig, frame):
66        change_file();
67
68#
69# Forces change file and update server database when SIGUSR1
70#
71def change_file():
72        global LIST,filename,next_filename,LOGPATH
73        # sync in-memory and temporary file data
74        update_db()
75        # clear in-memory list
76        LIST.clear()
77        # Check if permission flag is granted
78        if allow_send():
79            # Send data to master server
80            send_data()
81        else:
82            debug("Sending not allowed when try to change file")
83        # Get new temporary filename
84        next_filename=get_filename(LOGPATH)
85        # Sync data in-memory and temporary file data
86        update_db()
87        # Show curent data to stdout
88        show_list()
89#END def change_file():
90
91def check_server_acknowledge():
92        try:
93            c = xmlrpclib.ServerProxy("https://server:9779/")
94            return c.get_variable("","VariablesManager","STATS_ENABLED")
95        except:
96            return '0'
97#END def check_server_acknowledge()
98
99def check_local_acknowledge():
100        answer = None
101        if os.path.isfile('/etc/lliurex-analytics/status'):
102            fp = open('/etc/lliurex-analytics/status','r')
103            answer = fp.readline();
104            fp.close()
105        return answer.strip()
106
107#END check_local_acknowledge
108
109def allow_send():
110    answer = check_server_acknowledge()
111    if answer != '1':
112        answer = check_local_acknowledge()
113        try:
114            if answer == "yes":
115                debug("allow_send stats by check local acknowledge")
116                return True
117            else:
118                debug("deny allow_send stats by check server & local acknowledge")
119                return False
120        except Exception as e:
121            debug("deny allow_send stats by exception check server & local acknowledge")
122            return False
123    else:
124        debug("allow_send stats by check server acknowledge")
125        return True
126#END def allow_send()
127
128#
129# Sends data to server and clear all
130#
131def clean_and_send():
132        global LIST
133        # sync in-memory and temporary file data
134        update_db()
135        # clear in-memory list
136        LIST.clear()
137        if allow_send(): 
138            # Send data to master server
139            send_data()
140        else:
141            debug("Sending not allowed when try send file")
142        # Clear database file
143        clear_db()
144        # Sync data in-memory and temporary file data
145        update_db()
146#END def clean_and_send():
147
148#
149# Call show data when SIGINT2
150#
151def sig_handler_show(sig,frame):
152        debug("current filename "+filename)
153        show_list()
154        show_list_db()
155        global TIMEOUT
156        global BDTIMEOUT
157        global ctimeout
158        global btimeout
159        debug('TIMEOUT='+str(TIMEOUT)+',BDTIMEOUT='+str(BDTIMEOUT)+',ctimeout='+str(ctimeout)+',btimeout='+str(btimeout))
160#END def sig_handler_show(sig,frame):
161
162#
163# Global function called every second to run functions guided by timeout
164#
165def runner():
166        global t
167        global ctimeout
168        global btimeout
169        global TIMEOUT
170        global BDTIMEOUT
171
172        # Flush in-memory data to file db when expires timeout
173        if ctimeout > TIMEOUT:
174                update_db()
175                ctimeout=0
176        else:
177                ctimeout+=1
178
179        # Call send-data to server when expires dbtimeout
180        if btimeout > BDTIMEOUT:
181                clean_and_send();
182                btimeout=0
183        else:
184                btimeout+=1
185        debug('.',2)
186
187        # Reload 1 sec timeout
188        t=threading.Timer(1.0,runner)
189#        De-comment to allow stop threads inmediately
190#        t.daemon = True
191        t.start()
192#END def runner():
193
194#
195# Add item to in-memory LIST
196#
197def add_item(item):
198        global LIST,BLACKLIST,INTERPRETERS
199
200        # Split binary and parameters
201        #part=shlex.split(item)       
202        part=item.split(' ')
203        # Check binary name is correct
204        if ('/' in part[0]):
205                part[0]=part[0].split('/')[-1]         
206        # Check if binary name must be recorded
207        if (part[0] in BLACKLIST):
208                return
209        # Check if is a known interpreter
210        if (part[0] in INTERPRETERS):
211                part[0]=part[1]
212                # Check script name is correct
213                if ('/' in part[0]):
214                        part[0]=part[0].split('/')[-1]
215                # Check if it's a parameter
216                i=0
217                novalid=['-',':']
218                while (part[i][0] in novalid and i < len(part)):
219                        i+=1
220                if (i < len(part)):
221                        part[0]=part[i]
222                else:
223                        return
224        if ('/' in part[0]):
225                        part[0]=part[0].split('/')[-1]
226
227        # Check if binary name must be recorded
228        if (part[0] in BLACKLIST):
229                return
230
231        # Add or append to global list
232        if LIST.has_key(part[0]):
233                debug('+',2)
234                LIST[part[0]]+=1
235        else:
236                debug('*',2)
237                LIST[part[0]]=1
238#END def add_item(item):
239
240#
241# Show in-memory LIST and print to stdout
242#
243def show_list():
244        global LIST,daemon_mode,DEBUG_FILE
245        if daemon_mode != True:
246            print "Lista:"
247            print "~~~~~~~~~~"
248            for it in LIST:
249                print it + "->" + str(LIST[it])
250        else:
251            fp_debug=open(DEBUG_FILE,"a")
252            fp_debug.write("Lista:\n~~~~~~~\n");
253            for it in LIST:
254                fp_debug.write(it + "->" + str(LIST[it]) + "\n")
255            fp_debug.close()
256#END def show_list():
257
258#
259# Send data from temporary db to server using http post method
260#
261def send_data():
262    # Calculates mac address
263    f = open('/sys/class/net/eth0/address','r');
264    uid=f.read().strip();
265    f.close();
266
267    global filename;
268   
269    db = sqlite3.connect(filename);
270    cur= db.cursor();
271    try:
272        cur.execute('select * from info order by n DESC limit 100;');
273        json_tmp={};
274        # Dumps temporary database building json object to send
275        for x in cur.fetchall():
276            json_tmp[x[0]]=str(x[1]);
277    except:
278        return;
279    # Get version and flavour
280    p=subprocess.Popen(['lliurex-version','-n'],stdout=subprocess.PIPE)
281    out,err=p.communicate()
282    version=out.strip();
283
284    p=subprocess.Popen(['lliurex-version','-v'],stdout=subprocess.PIPE)
285    out,err=p.communicate()
286    sabor=out.strip();
287
288    # Send json encoded data to server
289    jsonobj=json.dumps(json_tmp);
290    jsonobj=json.dumps({'uid':uid,'vers':version,'sab':sabor, 'stats': jsonobj });
291    payload = {'stats':jsonobj};
292    try:
293        debug("SENDING DATA:"+str(payload))
294    except Exception as e:
295        debug("Error debugging data to send")
296
297    r = requests.post(url,data=payload,headers=headers);
298    debug("S("+r.text+')',2);
299#END def send_data():
300
301#
302# Show list stored into db file
303#
304def show_list_db():
305    global filename,daemon_mode,DEBUG_FILE
306    db = sqlite3.connect(filename)
307    cur= db.cursor()
308    if daemon_mode != True:
309        print "TOP 10:"
310        print "~~~~~~~~~~"
311        try:
312                cur.execute('select * from info order by n DESC limit 10;')
313                for x in cur.fetchall():
314                        print x[0] + " " + str(x[1]);
315        except:
316                print "No data available"
317    else:
318        fp_debug=open(DEBUG_FILE,"a")
319        fp_debug.write("TOP 10:\n~~~~~~~~~~\n")
320        try:
321                cur.execute('select * from info order by n DESC limit 10;')
322                for x in cur.fetchall():
323                        fp_debug.write(x[0] + " " + str(x[1])+"\n")
324        except:
325                fp_debug.write("No data available\n")
326        fp_debug.close()
327    db.close()
328#END def show_list_db():
329
330#
331# Calculates new name to store temporary database
332#
333def get_filename(LOGPATH=''):
334        global filename
335        l=[]
336        if (LOGPATH != ''):
337            LOGPATH=LOGPATH+'/'
338        #Load all alphabet into list
339        for k in list(string.ascii_lowercase):
340                l.append(k)
341
342        #Load all alphabet into list
343        for k in list(string.ascii_lowercase):
344
345                # Load all alphabet (with two characters) into list
346                for p in list(string.ascii_lowercase):
347                        l.append(k+p)
348        # l = {a,b,c...y,z,aa,ab...zy,zz)
349        # Get current date
350        today=date.today()
351
352        current=LOGPATH+'stats-'+str(today)+'.db'
353        ant=''
354        # Get the next index for apply to file
355        for x in l:
356                if not os.path.isfile(current):
357                        filename=LOGPATH+'stats-'+str(today)+ant+'.db'
358                        return LOGPATH+'stats-'+str(today)+x+'.db'
359                        break;
360                else:
361                        ant=x;
362                current=LOGPATH+'stats-'+str(today)+ant+'.db'
363#END def get_filename():
364
365#
366# Clear file with stored database
367#
368def clear_db():
369        global filename
370        debug('C',2)
371        db = sqlite3.connect(filename)
372        cur = db.cursor()
373        cur.execute('delete from info;')
374        db.commit()
375        db.close()
376#END def clear_db():
377
378#
379# Update temporary database and updates global LIST
380#
381def update_db():
382        debug('U',2)
383        global LIST,filename
384        #debug("updating "+filename);
385        # Temporary list
386        L2={}
387        # Open file to store temporary exec ocurrences
388        db = sqlite3.connect(filename)
389        cur= db.cursor()
390        try:
391                cur.execute('select * from info;')
392        except:
393                cur.execute('create table info (cmd varchar(100) primary key,n int(10));');
394                cur.execute('select * from info;')
395        # Load into L2 all database records
396        for x in cur.fetchall():
397                L2[x[0]]=x[1]
398        # Append to LIST all previously added database records
399        for x in L2:
400                if not LIST.has_key(x):
401                        LIST[x]=L2[x]
402        # Clear database records
403        cur.execute('delete from info;')
404        values=[]
405        # Write all records from LIST into file
406        for x in LIST:
407                values.append((x,str(LIST[x])))
408        cur.executemany('insert into info values (?,?)',values)
409        db.commit()
410        db.close()
411#END def update_db():
412
413def daemonize():
414        debug('Running daemon...')
415        with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analytics')):
416                start()
417#END def daemonize():
418
419def start():
420        fp = open('/var/run/analytics.pid','w');
421        fp.write(str(os.getpid()))
422        fp.close()
423        # Add handlers for functions, close, show, change file / update db
424        signal.signal(signal.SIGINT,sig_handler)
425        signal.signal(signal.SIGUSR1,sig_handler_change)
426        signal.signal(signal.SIGUSR2,sig_handler_show)
427        # Sync data
428        update_db()
429
430        t=threading.Timer(1.0,runner)
431        # De-comment to allow stop threads inmediately
432        #t.daemon = True
433        t.start()
434
435        p1= subprocess.Popen(["tail","-F",logfilename],stdout=subprocess.PIPE)
436
437
438        while True:
439                line = p1.stdout.readline()
440                if(re.search('type=EXECVE',line)):
441                        m=re.findall('a[0-9]+="([^"]+)"',line)
442                        t=""
443                        for i in m:
444                                t+=i + " "
445                        add_item(t)
446#END def start():
447               
448###################
449###### MAIN #######
450###################
451
452# enable / disable show more running data
453DEBUG=0
454# Global list in mem to store results
455LIST={}
456TIMEOUT=300
457BDTIMEOUT=300
458# Counters for threads going to timeout
459ctimeout=0
460btimeout=0
461
462# Get configuration parameters
463config = ConfigParser.ConfigParser()
464config.read('/etc/lliurex-analytics/agent.cfg')
465
466LOGPATH = config.get('Agent','logpath');
467DEBUG_FILE=LOGPATH+"/analytics-debug.log"
468# Filename for temporally files in disk (sqlite)
469filename=''  #initializes as global when is called get_filename
470next_filename=get_filename(LOGPATH)
471
472server = config.get('Server','server');
473server_path = config.get('Server','server-path');
474
475# Max timeout commiting changes to file
476TIMEOUT = config.getint('Agent','timeout')
477
478# Max timeout commiting changes from file to database
479BDTIMEOUT = config.getint('Agent','bdtimeout')
480
481agent = config.get('Agent','user-agent')
482headers = {'user-agent': agent};
483url="http://"+server+"/"+server_path;
484
485daemon_mode = config.getboolean('Agent','daemon')
486logfilename = config.get('Audit','file')
487
488blacklistname = config.get('Audit','blacklist')
489
490BLACKLIST = [line.rstrip('\n') for line in open(blacklistname)]
491INTERPRETERS = config.get('Audit','interpreters').split(',')
492
493debug("filename="+filename+' next='+next_filename)
494
495if __name__ == '__main__':
496        if (logfilename == 'stdin'):
497                if (len(sys.argv) < 2):
498                        sys.stderr.write('Error\n')
499                        sys.exit(1)
500                else:       
501                        logfilename=sys.argv[1]
502        else:
503                if daemon_mode == True:
504                        daemonize()
505                else:
506                        start()
507else:
508        if (logfilename == 'stdin'):
509                sys.stderr.write('file=stdin not valid running as module\n')
510                sys.exit(1)
511
Note: See TracBrowser for help on using the repository browser.