source: lliurex-statistics/trunk/fuentes/lliurex-statistics.install/usr/sbin/analytics @ 3330

Last change on this file since 3330 was 3330, checked in by mabarracus, 3 years ago

Allow proxy usage & network timeouts when sending data

  • Property svn:executable set to *
File size: 14.4 KB
Line 
1#!/usr/bin/python
2
3import re
4import os
5import sys
6import signal
7import subprocess
8import threading
9import shlex
10import sqlite3
11from datetime import date
12import string
13import requests
14import json
15import ConfigParser
16import daemon
17import lockfile
18import xmlrpclib
19import urllib
20
21######### END CONFIG ###########
22
23
24#
25# Shows debug data
26#
27def debug(text='',type=1):
28    global DEBUG, daemon_mode, DEBUG_FILE
29    fp_debug=open(DEBUG_FILE,"a")
30    if DEBUG:
31        if type == 1:
32            if daemon_mode != True:
33                print str(text)
34            else:
35                fp_debug.write(str(text)+"\n")
36        else:
37            if daemon_mode != True:
38                sys.stdout.write(text)
39            else:
40                fp_debug.write(str(text))
41        sys.stdout.flush()
42    fp_debug.close()
43#END def debug(text='',type=1):
44
45#
46# Handler to manage interrupt properly, sync data before die when SIGINT
47#
48def sig_handler(sig,frame):
49    global t,LIST
50    debug("Aborting...")
51    try:
52        # Cancel threads
53        t.cancel()
54        # Sync data
55        update_db()
56        # Show stored data to stdout
57        show_list_db()
58    except:
59        sys.exit(1)
60    sys.exit(0)
61#END def sig_handler(sig,frame):
62
63#
64# Forces change file when SIGUSR1
65#
66def sig_handler_change(sig, frame):
67        change_file();
68
69#
70# Forces change file and update server database when SIGUSR1
71#
72def change_file():
73        global LIST,filename,next_filename,LOGPATH
74        # sync in-memory and temporary file data
75        update_db()
76        # clear in-memory list
77        LIST.clear()
78        # Check if permission flag is granted
79        if allow_send():
80            # Send data to master server
81            send_data()
82        else:
83            debug("Sending not allowed when try to change file")
84        # Get new temporary filename
85        next_filename=get_filename(LOGPATH)
86        # Sync data in-memory and temporary file data
87        update_db()
88        # Show curent data to stdout
89        show_list()
90#END def change_file():
91
92def check_server_acknowledge():
93        try:
94            c = xmlrpclib.ServerProxy("https://server:9779/")
95            return c.get_variable("","VariablesManager","STATS_ENABLED")
96        except:
97            return '0'
98#END def check_server_acknowledge()
99
100def check_local_acknowledge():
101        answer = None
102        if os.path.isfile('/etc/lliurex-analytics/status'):
103            fp = open('/etc/lliurex-analytics/status','r')
104            answer = fp.readline();
105            fp.close()
106        return answer.strip()
107
108#END check_local_acknowledge
109
110def allow_send():
111    answer = check_server_acknowledge()
112    if answer != '1':
113        answer = check_local_acknowledge()
114        try:
115            if answer == "yes":
116                debug("allow_send stats by check local acknowledge")
117                return True
118            else:
119                debug("deny allow_send stats by check server & local acknowledge")
120                return False
121        except Exception as e:
122            debug("deny allow_send stats by exception check server & local acknowledge")
123            return False
124    else:
125        debug("allow_send stats by check server acknowledge")
126        return True
127#END def allow_send()
128
129#
130# Sends data to server and clear all
131#
132def clean_and_send():
133        global LIST
134        # sync in-memory and temporary file data
135        update_db()
136        # clear in-memory list
137        LIST.clear()
138        if allow_send(): 
139            # Send data to master server
140            send_data()
141        else:
142            debug("Sending not allowed when try send file")
143        # Clear database file
144        clear_db()
145        # Sync data in-memory and temporary file data
146        update_db()
147#END def clean_and_send():
148
149#
150# Call show data when SIGINT2
151#
152def sig_handler_show(sig,frame):
153        debug("current filename "+filename)
154        show_list()
155        show_list_db()
156        global TIMEOUT
157        global BDTIMEOUT
158        global ctimeout
159        global btimeout
160        debug('TIMEOUT='+str(TIMEOUT)+',BDTIMEOUT='+str(BDTIMEOUT)+',ctimeout='+str(ctimeout)+',btimeout='+str(btimeout))
161#END def sig_handler_show(sig,frame):
162
163#
164# Global function called every second to run functions guided by timeout
165#
166def runner():
167        global t
168        global ctimeout
169        global btimeout
170        global TIMEOUT
171        global BDTIMEOUT
172
173        # Flush in-memory data to file db when expires timeout
174        if ctimeout > TIMEOUT:
175                update_db()
176                ctimeout=0
177        else:
178                ctimeout+=1
179
180        # Call send-data to server when expires dbtimeout
181        if btimeout > BDTIMEOUT:
182                clean_and_send();
183                btimeout=0
184        else:
185                btimeout+=1
186        debug('.',2)
187
188        # Reload 1 sec timeout
189        t=threading.Timer(1.0,runner)
190#        De-comment to allow stop threads inmediately
191#        t.daemon = True
192        t.start()
193#END def runner():
194
195#
196# Add item to in-memory LIST
197#
198def add_item(item):
199        global LIST,BLACKLIST,INTERPRETERS
200
201        # Split binary and parameters
202        #part=shlex.split(item)       
203        part=item.split(' ')
204        # Check binary name is correct
205        if ('/' in part[0]):
206                part[0]=part[0].split('/')[-1]         
207        # Check if binary name must be recorded
208        if (part[0] in BLACKLIST):
209                return
210        # Check if is a known interpreter
211        if (part[0] in INTERPRETERS):
212                part[0]=part[1]
213                # Check script name is correct
214                if ('/' in part[0]):
215                        part[0]=part[0].split('/')[-1]
216                # Check if it's a parameter
217                i=0
218                novalid=['-',':']
219                while (part[i][0] in novalid and i < len(part)):
220                        i+=1
221                if (i < len(part)):
222                        part[0]=part[i]
223                else:
224                        return
225        if ('/' in part[0]):
226                        part[0]=part[0].split('/')[-1]
227
228        # Check if binary name must be recorded
229        if (part[0] in BLACKLIST):
230                return
231
232        # Add or append to global list
233        if LIST.has_key(part[0]):
234                debug('+',2)
235                LIST[part[0]]+=1
236        else:
237                debug('*',2)
238                LIST[part[0]]=1
239#END def add_item(item):
240
241#
242# Show in-memory LIST and print to stdout
243#
244def show_list():
245        global LIST,daemon_mode,DEBUG_FILE
246        if daemon_mode != True:
247            print "Lista:"
248            print "~~~~~~~~~~"
249            for it in LIST:
250                print it + "->" + str(LIST[it])
251        else:
252            fp_debug=open(DEBUG_FILE,"a")
253            fp_debug.write("Lista:\n~~~~~~~\n");
254            for it in LIST:
255                fp_debug.write(it + "->" + str(LIST[it]) + "\n")
256            fp_debug.close()
257#END def show_list():
258
259#
260# Send data from temporary db to server using http post method
261#
262def send_data():
263    # Calculates mac address
264    f = open('/sys/class/net/eth0/address','r');
265    uid=f.read().strip();
266    f.close();
267
268    global filename;
269   
270    db = sqlite3.connect(filename);
271    cur= db.cursor();
272    try:
273        cur.execute('select * from info order by n DESC limit 100;');
274        json_tmp={};
275        # Dumps temporary database building json object to send
276        for x in cur.fetchall():
277            json_tmp[x[0]]=str(x[1]);
278    except:
279        return;
280    # Get version and flavour
281    p=subprocess.Popen(['lliurex-version','-n'],stdout=subprocess.PIPE)
282    out,err=p.communicate()
283    version=out.strip();
284
285    p=subprocess.Popen(['lliurex-version','-v'],stdout=subprocess.PIPE)
286    out,err=p.communicate()
287    sabor=out.strip();
288
289    # Send json encoded data to server
290    jsonobj=json.dumps(json_tmp);
291    jsonobj=json.dumps({'uid':uid,'vers':version,'sab':sabor, 'stats': jsonobj });
292    payload = {'stats':jsonobj};
293    try:
294        debug("SENDING DATA:"+str(payload))
295    except Exception as e:
296        debug("Error debugging data to send")
297
298    r = requests.post(url,data=payload,headers=headers,proxies=urllib.getproxies(),timeout=5);
299    debug("S("+r.text+')',2);
300#END def send_data():
301
302#
303# Show list stored into db file
304#
305def show_list_db():
306    global filename,daemon_mode,DEBUG_FILE
307    db = sqlite3.connect(filename)
308    cur= db.cursor()
309    if daemon_mode != True:
310        print "TOP 10:"
311        print "~~~~~~~~~~"
312        try:
313                cur.execute('select * from info order by n DESC limit 10;')
314                for x in cur.fetchall():
315                        print x[0] + " " + str(x[1]);
316        except:
317                print "No data available"
318    else:
319        fp_debug=open(DEBUG_FILE,"a")
320        fp_debug.write("TOP 10:\n~~~~~~~~~~\n")
321        try:
322                cur.execute('select * from info order by n DESC limit 10;')
323                for x in cur.fetchall():
324                        fp_debug.write(x[0] + " " + str(x[1])+"\n")
325        except:
326                fp_debug.write("No data available\n")
327        fp_debug.close()
328    db.close()
329#END def show_list_db():
330
331#
332# Calculates new name to store temporary database
333#
334def get_filename(LOGPATH=''):
335        global filename
336        l=[]
337        if (LOGPATH != ''):
338            LOGPATH=LOGPATH+'/'
339        #Load all alphabet into list
340        for k in list(string.ascii_lowercase):
341                l.append(k)
342
343        #Load all alphabet into list
344        for k in list(string.ascii_lowercase):
345
346                # Load all alphabet (with two characters) into list
347                for p in list(string.ascii_lowercase):
348                        l.append(k+p)
349        # l = {a,b,c...y,z,aa,ab...zy,zz)
350        # Get current date
351        today=date.today()
352
353        current=LOGPATH+'stats-'+str(today)+'.db'
354        ant=''
355        # Get the next index for apply to file
356        for x in l:
357                if not os.path.isfile(current):
358                        filename=LOGPATH+'stats-'+str(today)+ant+'.db'
359                        return LOGPATH+'stats-'+str(today)+x+'.db'
360                        break;
361                else:
362                        ant=x;
363                current=LOGPATH+'stats-'+str(today)+ant+'.db'
364#END def get_filename():
365
366#
367# Clear file with stored database
368#
369def clear_db():
370        global filename
371        debug('C',2)
372        db = sqlite3.connect(filename)
373        cur = db.cursor()
374        cur.execute('delete from info;')
375        db.commit()
376        db.close()
377#END def clear_db():
378
379#
380# Update temporary database and updates global LIST
381#
382def update_db():
383        debug('U',2)
384        global LIST,filename
385        #debug("updating "+filename);
386        # Temporary list
387        L2={}
388        # Open file to store temporary exec ocurrences
389        db = sqlite3.connect(filename)
390        cur= db.cursor()
391        try:
392                cur.execute('select * from info;')
393        except:
394                cur.execute('create table info (cmd varchar(100) primary key,n int(10));');
395                cur.execute('select * from info;')
396        # Load into L2 all database records
397        for x in cur.fetchall():
398                L2[x[0]]=x[1]
399        # Append to LIST all previously added database records
400        for x in L2:
401                if not LIST.has_key(x):
402                        LIST[x]=L2[x]
403        # Clear database records
404        cur.execute('delete from info;')
405        values=[]
406        # Write all records from LIST into file
407        for x in LIST:
408                values.append((x,str(LIST[x])))
409        cur.executemany('insert into info values (?,?)',values)
410        db.commit()
411        db.close()
412#END def update_db():
413
414def daemonize():
415        debug('Running daemon...')
416        with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analytics')):
417                start()
418#END def daemonize():
419
420def start():
421        fp = open('/var/run/analytics.pid','w');
422        fp.write(str(os.getpid()))
423        fp.close()
424        # Add handlers for functions, close, show, change file / update db
425        signal.signal(signal.SIGINT,sig_handler)
426        signal.signal(signal.SIGUSR1,sig_handler_change)
427        signal.signal(signal.SIGUSR2,sig_handler_show)
428        # Sync data
429        update_db()
430
431        t=threading.Timer(1.0,runner)
432        # De-comment to allow stop threads inmediately
433        #t.daemon = True
434        t.start()
435
436        p1= subprocess.Popen(["tail","-F",logfilename],stdout=subprocess.PIPE)
437
438
439        while True:
440                line = p1.stdout.readline()
441                if(re.search('type=EXECVE',line)):
442                        m=re.findall('a[0-9]+="([^"]+)"',line)
443                        t=""
444                        for i in m:
445                                t+=i + " "
446                        add_item(t)
447#END def start():
448               
449###################
450###### MAIN #######
451###################
452
453# enable / disable show more running data
454DEBUG=0
455# Global list in mem to store results
456LIST={}
457TIMEOUT=300
458BDTIMEOUT=300
459# Counters for threads going to timeout
460ctimeout=0
461btimeout=0
462
463# Get configuration parameters
464config = ConfigParser.ConfigParser()
465config.read('/etc/lliurex-analytics/agent.cfg')
466
467LOGPATH = config.get('Agent','logpath');
468DEBUG_FILE=LOGPATH+"/analytics-debug.log"
469# Filename for temporally files in disk (sqlite)
470filename=''  #initializes as global when is called get_filename
471next_filename=get_filename(LOGPATH)
472
473server = config.get('Server','server');
474server_path = config.get('Server','server-path');
475
476# Max timeout commiting changes to file
477TIMEOUT = config.getint('Agent','timeout')
478
479# Max timeout commiting changes from file to database
480BDTIMEOUT = config.getint('Agent','bdtimeout')
481
482agent = config.get('Agent','user-agent')
483headers = {'user-agent': agent};
484url="http://"+server+"/"+server_path;
485
486daemon_mode = config.getboolean('Agent','daemon')
487logfilename = config.get('Audit','file')
488
489blacklistname = config.get('Audit','blacklist')
490
491BLACKLIST = [line.rstrip('\n') for line in open(blacklistname)]
492INTERPRETERS = config.get('Audit','interpreters').split(',')
493
494debug("filename="+filename+' next='+next_filename)
495
496if __name__ == '__main__':
497        if (logfilename == 'stdin'):
498                if (len(sys.argv) < 2):
499                        sys.stderr.write('Error\n')
500                        sys.exit(1)
501                else:       
502                        logfilename=sys.argv[1]
503        else:
504                if daemon_mode == True:
505                        daemonize()
506                else:
507                        start()
508else:
509        if (logfilename == 'stdin'):
510                sys.stderr.write('file=stdin not valid running as module\n')
511                sys.exit(1)
512
Note: See TracBrowser for help on using the repository browser.