source: lliurex-statistics/trunk/fuentes/lliurex-statistics.install/usr/sbin/analytics @ 3349

Last change on this file since 3349 was 3349, checked in by mabarracus, 3 years ago

Fix proxy usage in daemon mode

  • Property svn:executable set to *
File size: 15.0 KB
RevLine 
[2773]1#!/usr/bin/python
2
3import re
4import os
5import sys
6import signal
7import subprocess
8import threading
9import shlex
10import sqlite3
11from datetime import date
12import string
13import requests
14import json
15import ConfigParser
16import daemon
17import lockfile
[2948]18import xmlrpclib
[2773]19
20######### END CONFIG ###########
21
22
23#
24# Shows debug data
25#
26def debug(text='',type=1):
27    global DEBUG, daemon_mode, DEBUG_FILE
28    fp_debug=open(DEBUG_FILE,"a")
29    if DEBUG:
30        if type == 1:
31            if daemon_mode != True:
32                print str(text)
33            else:
34                fp_debug.write(str(text)+"\n")
35        else:
36            if daemon_mode != True:
37                sys.stdout.write(text)
38            else:
39                fp_debug.write(str(text))
40        sys.stdout.flush()
41    fp_debug.close()
42#END def debug(text='',type=1):
43
44#
45# Handler to manage interrupt properly, sync data before die when SIGINT
46#
47def sig_handler(sig,frame):
48    global t,LIST
49    debug("Aborting...")
50    try:
51        # Cancel threads
52        t.cancel()
53        # Sync data
54        update_db()
55        # Show stored data to stdout
56        show_list_db()
57    except:
58        sys.exit(1)
59    sys.exit(0)
60#END def sig_handler(sig,frame):
61
62#
63# Forces change file when SIGUSR1
64#
65def sig_handler_change(sig, frame):
66        change_file();
67
68#
69# Forces change file and update server database when SIGUSR1
70#
71def change_file():
72        global LIST,filename,next_filename,LOGPATH
73        # sync in-memory and temporary file data
74        update_db()
75        # clear in-memory list
76        LIST.clear()
77        # Check if permission flag is granted
[2948]78        if allow_send():
79            # Send data to master server
80            send_data()
[2962]81        else:
82            debug("Sending not allowed when try to change file")
[2773]83        # Get new temporary filename
84        next_filename=get_filename(LOGPATH)
85        # Sync data in-memory and temporary file data
86        update_db()
87        # Show curent data to stdout
88        show_list()
89#END def change_file():
90
[2948]91def check_server_acknowledge():
[2966]92        try:
93            c = xmlrpclib.ServerProxy("https://server:9779/")
94            return c.get_variable("","VariablesManager","STATS_ENABLED")
95        except:
96            return '0'
[2948]97#END def check_server_acknowledge()
98
99def check_local_acknowledge():
100        answer = None
101        if os.path.isfile('/etc/lliurex-analytics/status'):
102            fp = open('/etc/lliurex-analytics/status','r')
103            answer = fp.readline();
104            fp.close()
105        return answer.strip()
106
107#END check_local_acknowledge
108
109def allow_send():
110    answer = check_server_acknowledge()
111    if answer != '1':
112        answer = check_local_acknowledge()
113        try:
114            if answer == "yes":
[2962]115                debug("allow_send stats by check local acknowledge")
[2948]116                return True
117            else:
[2962]118                debug("deny allow_send stats by check server & local acknowledge")
[2948]119                return False
120        except Exception as e:
[2962]121            debug("deny allow_send stats by exception check server & local acknowledge")
[2948]122            return False
123    else:
[2962]124        debug("allow_send stats by check server acknowledge")
[2948]125        return True
126#END def allow_send()
127
[2773]128#
129# Sends data to server and clear all
130#
131def clean_and_send():
132        global LIST
133        # sync in-memory and temporary file data
134        update_db()
135        # clear in-memory list
136        LIST.clear()
[2948]137        if allow_send(): 
138            # Send data to master server
139            send_data()
[2963]140        else:
[2962]141            debug("Sending not allowed when try send file")
[2773]142        # Clear database file
143        clear_db()
144        # Sync data in-memory and temporary file data
145        update_db()
146#END def clean_and_send():
147
148#
149# Call show data when SIGINT2
150#
151def sig_handler_show(sig,frame):
152        debug("current filename "+filename)
153        show_list()
154        show_list_db()
155        global TIMEOUT
156        global BDTIMEOUT
157        global ctimeout
158        global btimeout
159        debug('TIMEOUT='+str(TIMEOUT)+',BDTIMEOUT='+str(BDTIMEOUT)+',ctimeout='+str(ctimeout)+',btimeout='+str(btimeout))
160#END def sig_handler_show(sig,frame):
161
162#
163# Global function called every second to run functions guided by timeout
164#
165def runner():
166        global t
167        global ctimeout
168        global btimeout
169        global TIMEOUT
170        global BDTIMEOUT
171
172        # Flush in-memory data to file db when expires timeout
173        if ctimeout > TIMEOUT:
174                update_db()
175                ctimeout=0
176        else:
177                ctimeout+=1
178
179        # Call send-data to server when expires dbtimeout
180        if btimeout > BDTIMEOUT:
181                clean_and_send();
182                btimeout=0
183        else:
184                btimeout+=1
185        debug('.',2)
186
187        # Reload 1 sec timeout
188        t=threading.Timer(1.0,runner)
189#        De-comment to allow stop threads inmediately
190#        t.daemon = True
191        t.start()
192#END def runner():
193
194#
195# Add item to in-memory LIST
196#
197def add_item(item):
198        global LIST,BLACKLIST,INTERPRETERS
199
200        # Split binary and parameters
201        #part=shlex.split(item)       
202        part=item.split(' ')
203        # Check binary name is correct
204        if ('/' in part[0]):
205                part[0]=part[0].split('/')[-1]         
206        # Check if binary name must be recorded
207        if (part[0] in BLACKLIST):
208                return
209        # Check if is a known interpreter
210        if (part[0] in INTERPRETERS):
211                part[0]=part[1]
212                # Check script name is correct
213                if ('/' in part[0]):
214                        part[0]=part[0].split('/')[-1]
215                # Check if it's a parameter
216                i=0
217                novalid=['-',':']
218                while (part[i][0] in novalid and i < len(part)):
219                        i+=1
220                if (i < len(part)):
221                        part[0]=part[i]
222                else:
223                        return
224        if ('/' in part[0]):
225                        part[0]=part[0].split('/')[-1]
226
227        # Check if binary name must be recorded
228        if (part[0] in BLACKLIST):
229                return
230
231        # Add or append to global list
232        if LIST.has_key(part[0]):
233                debug('+',2)
234                LIST[part[0]]+=1
235        else:
236                debug('*',2)
237                LIST[part[0]]=1
238#END def add_item(item):
239
240#
241# Show in-memory LIST and print to stdout
242#
243def show_list():
244        global LIST,daemon_mode,DEBUG_FILE
245        if daemon_mode != True:
246            print "Lista:"
247            print "~~~~~~~~~~"
248            for it in LIST:
249                print it + "->" + str(LIST[it])
250        else:
251            fp_debug=open(DEBUG_FILE,"a")
252            fp_debug.write("Lista:\n~~~~~~~\n");
253            for it in LIST:
254                fp_debug.write(it + "->" + str(LIST[it]) + "\n")
255            fp_debug.close()
256#END def show_list():
257
258#
259# Send data from temporary db to server using http post method
260#
261def send_data():
262    # Calculates mac address
263    f = open('/sys/class/net/eth0/address','r');
264    uid=f.read().strip();
265    f.close();
266
[3349]267    global filename,url,proxy,headers;
[2773]268   
269    db = sqlite3.connect(filename);
270    cur= db.cursor();
271    try:
272        cur.execute('select * from info order by n DESC limit 100;');
273        json_tmp={};
274        # Dumps temporary database building json object to send
275        for x in cur.fetchall():
276            json_tmp[x[0]]=str(x[1]);
277    except:
278        return;
279    # Get version and flavour
280    p=subprocess.Popen(['lliurex-version','-n'],stdout=subprocess.PIPE)
281    out,err=p.communicate()
282    version=out.strip();
283
284    p=subprocess.Popen(['lliurex-version','-v'],stdout=subprocess.PIPE)
285    out,err=p.communicate()
286    sabor=out.strip();
287
288    # Send json encoded data to server
289    jsonobj=json.dumps(json_tmp);
290    jsonobj=json.dumps({'uid':uid,'vers':version,'sab':sabor, 'stats': jsonobj });
291    payload = {'stats':jsonobj};
[2962]292    try:
293        debug("SENDING DATA:"+str(payload))
294    except Exception as e:
295        debug("Error debugging data to send")
[3349]296    #debug('proxy set to '+proxy)
297    p={}
298    p['http']=proxy
299    #debug('p='+str(p))
300    try:
301        if proxy != '':
302            r = requests.post(url,data=payload,headers=headers,proxies=p,timeout=5)
303        else:
304            r = requests.post(url,data=payload,headers=headers,timeout=5)
305        debug("S("+r.text+')',2);
306    except Exception as e:
307        debug("Error sending request: "+str(e))
[2773]308#END def send_data():
309
310#
311# Show list stored into db file
312#
313def show_list_db():
314    global filename,daemon_mode,DEBUG_FILE
315    db = sqlite3.connect(filename)
316    cur= db.cursor()
317    if daemon_mode != True:
318        print "TOP 10:"
319        print "~~~~~~~~~~"
320        try:
321                cur.execute('select * from info order by n DESC limit 10;')
322                for x in cur.fetchall():
323                        print x[0] + " " + str(x[1]);
324        except:
325                print "No data available"
326    else:
327        fp_debug=open(DEBUG_FILE,"a")
328        fp_debug.write("TOP 10:\n~~~~~~~~~~\n")
329        try:
330                cur.execute('select * from info order by n DESC limit 10;')
331                for x in cur.fetchall():
332                        fp_debug.write(x[0] + " " + str(x[1])+"\n")
333        except:
334                fp_debug.write("No data available\n")
335        fp_debug.close()
336    db.close()
337#END def show_list_db():
338
339#
340# Calculates new name to store temporary database
341#
342def get_filename(LOGPATH=''):
343        global filename
344        l=[]
345        if (LOGPATH != ''):
346            LOGPATH=LOGPATH+'/'
347        #Load all alphabet into list
348        for k in list(string.ascii_lowercase):
349                l.append(k)
350
351        #Load all alphabet into list
352        for k in list(string.ascii_lowercase):
353
354                # Load all alphabet (with two characters) into list
355                for p in list(string.ascii_lowercase):
356                        l.append(k+p)
357        # l = {a,b,c...y,z,aa,ab...zy,zz)
358        # Get current date
359        today=date.today()
360
361        current=LOGPATH+'stats-'+str(today)+'.db'
362        ant=''
363        # Get the next index for apply to file
364        for x in l:
365                if not os.path.isfile(current):
366                        filename=LOGPATH+'stats-'+str(today)+ant+'.db'
367                        return LOGPATH+'stats-'+str(today)+x+'.db'
368                        break;
369                else:
370                        ant=x;
371                current=LOGPATH+'stats-'+str(today)+ant+'.db'
372#END def get_filename():
373
374#
375# Clear file with stored database
376#
377def clear_db():
378        global filename
379        debug('C',2)
380        db = sqlite3.connect(filename)
381        cur = db.cursor()
382        cur.execute('delete from info;')
383        db.commit()
384        db.close()
385#END def clear_db():
386
387#
388# Update temporary database and updates global LIST
389#
390def update_db():
[3349]391        debug('U',1)
[2773]392        global LIST,filename
393        #debug("updating "+filename);
394        # Temporary list
395        L2={}
396        # Open file to store temporary exec ocurrences
397        db = sqlite3.connect(filename)
398        cur= db.cursor()
399        try:
400                cur.execute('select * from info;')
401        except:
402                cur.execute('create table info (cmd varchar(100) primary key,n int(10));');
403                cur.execute('select * from info;')
404        # Load into L2 all database records
405        for x in cur.fetchall():
406                L2[x[0]]=x[1]
407        # Append to LIST all previously added database records
408        for x in L2:
409                if not LIST.has_key(x):
410                        LIST[x]=L2[x]
411        # Clear database records
412        cur.execute('delete from info;')
413        values=[]
414        # Write all records from LIST into file
415        for x in LIST:
416                values.append((x,str(LIST[x])))
417        cur.executemany('insert into info values (?,?)',values)
418        db.commit()
419        db.close()
420#END def update_db():
421
422def daemonize():
423        debug('Running daemon...')
[3349]424        try:
425            with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analytics')):
[2773]426                start()
[3349]427        except Exception as e:
428            debug("Error daemonizing "+str(e))
[2773]429#END def daemonize():
430
431def start():
432        fp = open('/var/run/analytics.pid','w');
433        fp.write(str(os.getpid()))
434        fp.close()
435        # Add handlers for functions, close, show, change file / update db
436        signal.signal(signal.SIGINT,sig_handler)
437        signal.signal(signal.SIGUSR1,sig_handler_change)
438        signal.signal(signal.SIGUSR2,sig_handler_show)
439        # Sync data
440        update_db()
441
442        t=threading.Timer(1.0,runner)
443        # De-comment to allow stop threads inmediately
444        #t.daemon = True
445        t.start()
446
447        p1= subprocess.Popen(["tail","-F",logfilename],stdout=subprocess.PIPE)
448
449
450        while True:
451                line = p1.stdout.readline()
452                if(re.search('type=EXECVE',line)):
453                        m=re.findall('a[0-9]+="([^"]+)"',line)
454                        t=""
455                        for i in m:
456                                t+=i + " "
457                        add_item(t)
458#END def start():
459               
460###################
461###### MAIN #######
462###################
463
464# enable / disable show more running data
465DEBUG=0
466# Global list in mem to store results
467LIST={}
468TIMEOUT=300
469BDTIMEOUT=300
470# Counters for threads going to timeout
471ctimeout=0
472btimeout=0
473
474# Get configuration parameters
475config = ConfigParser.ConfigParser()
476config.read('/etc/lliurex-analytics/agent.cfg')
477
478LOGPATH = config.get('Agent','logpath');
479DEBUG_FILE=LOGPATH+"/analytics-debug.log"
480# Filename for temporally files in disk (sqlite)
481filename=''  #initializes as global when is called get_filename
482next_filename=get_filename(LOGPATH)
483
484server = config.get('Server','server');
485server_path = config.get('Server','server-path');
486
487# Max timeout commiting changes to file
488TIMEOUT = config.getint('Agent','timeout')
489
490# Max timeout commiting changes from file to database
491BDTIMEOUT = config.getint('Agent','bdtimeout')
492
493agent = config.get('Agent','user-agent')
494headers = {'user-agent': agent};
495url="http://"+server+"/"+server_path;
496
497daemon_mode = config.getboolean('Agent','daemon')
498logfilename = config.get('Audit','file')
499
500blacklistname = config.get('Audit','blacklist')
501
502BLACKLIST = [line.rstrip('\n') for line in open(blacklistname)]
503INTERPRETERS = config.get('Audit','interpreters').split(',')
504
[2966]505debug("filename="+filename+' next='+next_filename)
[3349]506try:
507     px= subprocess.Popen(["bash","-c","source /etc/profile && echo $http_proxy"],stdout=subprocess.PIPE)
508     proxy=px.stdout.readline().strip()
509     debug('Detected proxy: '+str(proxy))
510except Exception as e:
511     debug('Error getting proxy '+str(e))
[2773]512
513if __name__ == '__main__':
514        if (logfilename == 'stdin'):
515                if (len(sys.argv) < 2):
516                        sys.stderr.write('Error\n')
517                        sys.exit(1)
518                else:       
519                        logfilename=sys.argv[1]
520        else:
521                if daemon_mode == True:
522                        daemonize()
523                else:
524                        start()
525else:
526        if (logfilename == 'stdin'):
527                sys.stderr.write('file=stdin not valid running as module\n')
528                sys.exit(1)
529
Note: See TracBrowser for help on using the repository browser.