- Timestamp:
- Feb 14, 2018, 5:36:51 PM (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lliurex-statistics/trunk/fuentes/lliurex-statistics.install/usr/sbin/analytics
r6776 r6815 1 #!/usr/bin/python 2 1 #!/usr/bin/env python3 2 import sys 3 import os 3 4 import re 4 import os5 import sys6 5 import signal 6 import time 7 7 import subprocess 8 import threading9 import shlex10 import sqlite311 from datetime import date12 import string13 8 import requests 14 9 import json 15 import ConfigParser10 import configparser 16 11 import daemon 12 from xmlrpc import client 17 13 import lockfile 18 import xmlrpclib 19 import time 20 21 ######### END CONFIG ########### 22 14 import logging.handlers 15 from logging import config as cfg 16 import ssl 17 18 19 oldsignals = {} 20 for sig in signal.Signals: 21 try: 22 oldsignals.setdefault(sig.name, signal.getsignal(sig)) 23 signal.signal(sig.value, signal.SIG_IGN) 24 except: 25 continue 23 26 24 27 # 25 # S hows debug data28 # START EDITABLE VARS (OVERRIDES VALUES IN CONFIGFILE) 26 29 # 27 def debug(text='',type=1): 28 global DEBUG, daemon_mode, DEBUG_FILE 29 fp_debug=open(DEBUG_FILE,"a") 30 if DEBUG: 31 if type == 1: 32 if daemon_mode != True: 33 print str(text) 30 31 # DEBUG = 1 32 33 MODE = 'PROCEDURAL' 34 # MODE = 'THREADED' 35 36 CONFIGFILE = '/etc/lliurex-analytics/agent.cfg' 37 # CONFIGFILE = 'config.txt' 38 39 # OVERRIDE_SEND_PERMISSION = 1 40 41 # MIN_LOG_LEVEL = 'info' 42 43 # DAEMON_MODE = 1 44 45 # FILELOCK = '/tmp/analytics' 46 47 # PIDFILE = '/var/run/analitics.pid' 48 # PIDFILE = '/tmp/analitics.pid' 49 50 # STATUSFILE = '/etc/lliurex-analytics/status' 51 52 # TIMEOUT = 1 53 54 # 55 # END EDITABLE VARS # 56 # 57 58 59 if MODE == 'PROCEDURAL': 60 from multiprocessing import Process, Manager 61 str_formatter = '(%(processName)s)' 62 if MODE == 'THREADED': 63 from multiprocessing.dummy import Process, Manager 64 str_formatter = '(%(threadName)s)' 65 66 def get_var_value(varname, config=None, mode='string', section='Agent'): 67 value = None 68 69 if config: 70 varname = varname.lower() 71 try: 72 if mode == 'string': 73 value = config.get(section, varname) 74 elif mode == 'bool': 75 value = config.getboolean(section, varname) 76 elif mode == 'int': 77 value = config.getint(section, varname) 78 elif mode == 'float': 79 value = config.getfloat(section, varname) 80 except: 81 pass 82 83 def f(item): 84 if isinstance(item, str) or isinstance(item, bool) or isinstance(item, int) or isinstance(item, float): 85 return True 86 else: 87 return False 88 89 for x in (v for v in globals() if varname.lower() == v.lower() and f(globals()[v])): 90 value = globals()[x] 91 92 if mode == 'string': 93 return str(value) 94 elif mode == 'bool': 95 return bool(value) 96 elif mode == 'int': 97 return int(value) 98 elif mode == 'float': 99 return float(value) 100 else: 101 return value 102 103 104 if get_var_value('DEBUG', mode='bool'): 105 loglevel = get_var_value('MIN_LOG_LEVEL') 106 if loglevel: 107 if loglevel == 'debug' or loglevel == logging.DEBUG: 108 loglevel = logging.DEBUG 109 elif loglevel == 'critical' or loglevel == logging.CRITICAL: 110 loglevel = logging.CRITICAL 111 elif loglevel == 'error' or loglevel == logging.ERROR: 112 loglevel = logging.ERROR 113 elif loglevel == 'warning' or loglevel == logging.WARNING: 114 loglevel = logging.WARNING 115 elif loglevel == 'info' or loglevel == logging.INFO: 116 loglevel = logging.INFO 117 else: 118 loglevel = logging.DEBUG 119 else: 120 loglevel = logging.DEBUG 121 else: 122 loglevel = logging.INFO 123 124 125 LOGGING = { 126 'version': 1, 127 'disable_existing_loggers': False, 128 'formatters': { 129 'verbose': { 130 # 'format': '%(levelname)s %(module)s (%(pathname)s:%(lineno)d) ' + str_formatter + ' %(message)s' 131 'format': '%(levelname)s %(module)s ' + str_formatter + ' %(message)s' 132 }, 133 }, 134 'handlers': { 135 'stdout': { 136 'class': 'logging.StreamHandler', 137 'stream': sys.stdout, 138 'formatter': 'verbose', 139 }, 140 'sys-logger6': { 141 'class': 'logging.handlers.SysLogHandler', 142 'address': '/dev/log', 143 'facility': "local6", 144 'formatter': 'verbose', 145 }, 146 }, 147 'loggers': { 148 'analytics-logger': { 149 'handlers': ['sys-logger6', 'stdout'], 150 'level': loglevel, 151 'propagate': True, 152 }, 153 } 154 } 155 156 157 cfg.dictConfig(LOGGING) 158 log = logging.getLogger('analytics-logger') 159 160 161 def print_config(config): 162 global log 163 164 for sect in config: 165 for key in config[sect]: 166 log.debug("Config Loaded: '{}' '{}' '{}'".format(sect, key, config[sect][key])) 167 168 169 def init_config(): 170 171 config = configparser.ConfigParser() 172 config.read(CONFIGFILE) 173 174 return config 175 176 177 def init_logging(config): 178 global DEBUG, log 179 180 if get_var_value('DEBUG', config): 181 DEBUG = True 182 log.setLevel(loglevel) 183 print_config(config) 184 185 186 def bin_to_ascii(value): 187 global log 188 189 try: 190 if isinstance(value, bytes): 191 return value.decode('utf-8') 192 else: 193 return value 194 except Exception as e: 195 log.error('Error bin_to_ascii {}'.format(e)) 196 return value 197 198 199 def get_llx_version(): 200 global log 201 202 output = bin_to_ascii(subprocess.check_output(['lliurex-version', '-n'])) 203 release = output[0:2].strip() 204 if release == '15': 205 use = 'lliurex-detect' 206 else: 207 use = 'lliurex-version' 208 output = bin_to_ascii(subprocess.check_output([use, '-f'])) 209 flavour = output.strip() 210 log.info("Detected release:'{}' flavour:'{}'".format(release, flavour)) 211 return release, flavour 212 213 214 def detect_proxy(): 215 global log 216 217 px = subprocess.Popen(["bash", "-c", "source /etc/profile && echo $http_proxy"], stdout=subprocess.PIPE) 218 proxy = bin_to_ascii(px.stdout.readline()).strip() 219 log.info("Detected proxy: '{}'".format(proxy)) 220 return proxy 221 222 223 def daemonize(*args, **kwargs): 224 global glob, log, CONFIG 225 226 log.info('Running daemon mode...') 227 filelock = get_var_value('filelock', CONFIG) 228 if not filelock: 229 filelock = '/var/run/analytics' 230 231 try: 232 with daemon.DaemonContext(detach_process=True, 233 working_directory='/tmp', 234 umask=0o002, 235 pidfile=lockfile.FileLock(filelock), 236 files_preserve=[log.handlers[0].socket.fileno()]): 237 start(daemon_mode=True) 238 except Exception as e: 239 log.critical("Error daemonizing {}".format(e)) 240 sys.exit(1) 241 242 243 def add_item(item): 244 global glob, log 245 246 log.debug('Request to add {}'.format(item)) 247 248 parts_item = item.split(' ') 249 250 executable = None 251 not_valid = ['-', ':'] 252 for i in range(len(parts_item)): 253 executable = parts_item[i].strip() 254 if executable[0] in not_valid: 255 log.debug('Skipping malformed executable {}'.format(executable)) 256 executable = None 257 continue 258 if '/' in executable: 259 executable = executable.split('/')[-1] 260 log.debug('Trimming executable to {}'.format(executable)) 261 if executable in glob['INTERPRETERS']: 262 log.debug('Trimming interpreter part {}'.format(executable)) 263 executable = None 264 continue 265 else: 266 if executable in glob['BLACKLIST']: 267 log.debug('Skipping add due to blacklisted command {}'.format(executable)) 268 return None 269 log.debug('Valid executable {}'.format(executable)) 270 break 271 272 the_list = glob['LIST'] 273 if executable: 274 if executable in the_list: 275 the_list[executable] = the_list[executable] + 1 276 log.info('+++ Incrementing {} = {}'.format(executable, the_list[executable])) 277 else: 278 log.info('*** Adding {} = 1'.format(executable, 1)) 279 the_list[executable] = 1 280 glob['LIST'] = the_list 281 282 283 def monitor(): 284 global glob, log 285 286 log.info('Start monitor') 287 logfilename = get_var_value('file', glob['config'], section='Audit') 288 289 glob['BLACKLIST'] = get_var_value('blacklist', glob['config'], section='Audit') 290 glob['INTERPRETERS'] = get_var_value('interpreters', glob['config'], section='Audit') 291 292 try: 293 glob['INTERPRETERS'] = [x.strip() for x in glob['INTERPRETERS'].split(',')] 294 except Exception as e: 295 log.error('Malformed interpreters list ,{}'.format(e)) 296 glob['INTERPRETERS'] = [] 297 return None 298 299 try: 300 with open(glob['BLACKLIST'], 'r') as fp: 301 glob['BLACKLIST'] = [line.strip() for line in fp] 302 except Exception as e: 303 log.error('Unable to read blacklist from {} , {}'.format(glob['BLACKLIST'], e)) 304 glob['BLACKLIST'] = [] 305 return None 306 307 try: 308 if not (os.path.isfile(logfilename) and os.access(logfilename, os.R_OK)): 309 log.critical('File {} not readable'.format(logfilename)) 310 glob['TERMINATE'] = True 311 312 fp = subprocess.Popen(['tail', '-F', logfilename], stdout=subprocess.PIPE, stderr=open(os.devnull, 'w')) 313 except Exception as e: 314 log.critical('Error initializing {} read, {}'.format(logfilename, e)) 315 glob['TERMINATE'] = True 316 return None 317 318 try: 319 log.info('Starting monitoring {}'.format(logfilename)) 320 321 while not glob['TERMINATE']: 322 if fp.poll() is not None: 323 log.error('Dead subprocess monitoring {}'.format(logfilename)) 324 fp = subprocess.Popen(['tail', '-F', logfilename], stdout=subprocess.PIPE, stderr=open(os.devnull, 'w')) 34 325 else: 35 fp_debug.write(str(text)+"\n") 36 else: 37 if daemon_mode != True: 38 sys.stdout.write(text) 326 line = bin_to_ascii(fp.stdout.readline()).strip() 327 if re.search('type=EXECVE', line): 328 m = re.findall('a[0-9]+="([^"]+)"', line) 329 if m: 330 captured = ' '.join(m) 331 add_item(captured) 332 333 except Exception as e: 334 if isinstance(e, ConnectionResetError): 335 log.info('Connection reset exitting monitor thread') 336 glob['TERMINATE'] = True 337 return 338 else: 339 log.error('Error reading file {}, {}'.format(logfilename, e)) 340 glob['TERMINATE'] = True 341 return 342 343 log.info('Exitting monitor thread') 344 return 345 346 def update_list(): 347 global glob, log 348 349 log.info('Start update list') 350 try: 351 list_path = get_var_value('list_path', glob['config'], mode='string', section='Server') 352 server = get_var_value('server', glob['config'], mode='string', section='Server') 353 url = 'http://' + server + '/' + list_path 354 agent = glob['user_agent'] 355 headers = {'user-agent': agent} 356 except Exception as e: 357 log.warning('Error gettting update list settings {}'.format(e)) 358 359 log.debug('List path {}'.format(url)) 360 361 tick = 1 362 timeout = 60 * 60 * 12 363 c = 10 364 365 while not glob['TERMINATE']: 366 time.sleep(tick) 367 if c > 0: 368 c = c - tick 369 else: 370 c = timeout 371 372 sent = False 373 rq = None 374 375 try: 376 if glob['use_proxy']: 377 proxy_obj = dict() 378 proxy_obj.setdefault('http', glob['proxy']) 379 380 rq = requests.get(url, headers=headers, proxies=proxy_obj, timeout=5) 381 sent = True 382 else: 383 rq = requests.get(url, headers=headers, timeout=5) 384 sent = True 385 except Exception as e: 386 log.warning('Error getting list from {}, {}'.format(url,e)) 387 388 try: 389 blist = glob['BLACKLIST'] 390 except Exception as e: 391 log.error('Error loading current blacklist on update_list, {}'.format(e)) 392 393 try: 394 the_list = glob['LIST'] 395 except Exception as e: 396 log.error('Error loading current applist on update_list, {}'.format(e)) 397 398 if sent and rq: 399 result = rq.text 400 try: 401 json_list = json.loads(result) 402 except Exception as e: 403 log.warning('Wrong list received {}, {}'.format(result,e)) 404 continue 405 406 try: 407 for item in json_list: 408 if item not in blist: 409 blist.append(item) 410 log.info("Received item list '{}'".format(item)) 411 if item in the_list: 412 del the_list[item] 413 log.info("Removed item from list '{}'".format(item)) 414 glob['BLACKLIST'] = blist 415 glob['LIST'] = the_list 416 except Exception as e: 417 log.error('Error updating blacklist, {}'.format(e)) 39 418 else: 40 fp_debug.write(str(text)) 41 sys.stdout.flush() 42 fp_debug.close() 43 #END def debug(text='',type=1): 44 45 # 46 # Handler to manage interrupt properly, sync data before die when SIGINT 47 # 48 def sig_handler(sig,frame): 49 global t,LIST,RUNNER 50 debug("Aborting...") 51 RUNNER=0 52 try: 53 # Cancel threads 54 t.cancel() 55 # Sync data 56 update_db() 57 # Show stored data to stdout 58 show_list_db() 419 log.warning('Unable to get list data') 420 421 log.info('Exitting update list thread') 422 423 424 def timed_send(): 425 global glob, log 426 427 log.debug('Start timed_send ') 428 try: 429 count = get_var_value('timeout', glob['config'], mode='int') 430 if count < 0: 431 log.warning('Not valid timeout value setting default 300') 432 except Exception as e: 433 log.warning('Unable to read timeout value defaulting to 300, {}'.format(e)) 434 count = 300 435 436 log.info('Initialized timed send with value {} seconds'.format(count)) 437 c = count 438 tick = 0.2 439 try: 440 while not glob['TERMINATE']: 441 while glob['PRINTING'] == True: 442 time.sleep(1) 443 time.sleep(tick) 444 if c > 0: 445 c = c - tick 446 else: 447 c = count 448 log.debug('Triggering timed send') 449 clean_and_send() 450 451 except Exception as e: 452 if isinstance(e, ConnectionResetError): 453 log.info('Connection reset exitting timer thread') 454 else: 455 log.error('Error with timed send, {}'.format(e)) 456 457 log.info('Exitting timer thread') 458 return 459 460 461 def start(*args, daemon_mode=False, **kwargs): 462 global THREADS, oldsignals, log, CONFIG, glob 463 464 log.info("Starting analytics") 465 466 mgr = Manager() 467 glob = mgr.dict() 468 469 glob['DAEMON_MODE'] = daemon_mode 470 glob['config'] = CONFIG 471 472 try: 473 glob['release'], glob['flavour'] = get_llx_version() 474 except Exception as e: 475 log.error('Error getting llx version {}'.format(e)) 476 glob['release'] = 'Unknown' 477 glob['flavour'] = 'Unknown' 478 479 try: 480 glob['proxy'] = detect_proxy() 481 if glob['proxy'] == '': 482 log.info('Not using proxy') 483 glob['use_proxy'] = False 484 else: 485 glob['use_proxy'] = True 486 except Exception as e: 487 log.warning('Error detecting proxy {}'.format(e)) 488 glob['use_proxy'] = False 489 490 pidfile = get_var_value('pidfile', glob['config']) 491 492 try: 493 server = get_var_value('server', glob['config'], section='Server') 494 server_path = get_var_value('server-path', glob['config'], section='Server') 495 if server.strip() == '' or server_path.strip() == '': 496 raise Exception('Empty server or server-path') 497 glob['server'] = server 498 glob['server_path'] = server_path 499 except Exception as e: 500 log.critical('Error getting server url, {}'.format(e)) 501 502 try: 503 agent = get_var_value('user-agent', glob['config']) 504 if agent.strip() == '' or agent == 'None': 505 agent = 'lliurex-analytics-agent' 506 glob['user_agent'] = agent 507 except Exception as e: 508 log.warning('Error getting user-agent, {}'.format(e)) 509 510 # write pid 511 try: 512 with open(pidfile, 'w') as fp: 513 fp.write(str(os.getpid())) 514 except Exception as e: 515 log.error('Error writting pidfile {}'.format(e)) 516 517 glob['TERMINATE'] = False 518 glob['PRINTING'] = False 519 glob['LIST'] = {} 520 521 glob['platform_data'] = get_platform_data() 522 523 THREADS = dict() 524 THREADS['monitor'] = Process(target=monitor, name='monitor') 525 THREADS['monitor'].daemon = glob['DAEMON_MODE'] 526 527 THREADS['timed_send'] = Process(target=timed_send, name='timed_send') 528 THREADS['timed_send'].daemon = glob['DAEMON_MODE'] 529 530 THREADS['update_list'] = Process(target=update_list, name='update_list') 531 THREADS['update_list'].daemon = glob['DAEMON_MODE'] 532 533 THREADS['monitor'].start() 534 THREADS['timed_send'].start() 535 THREADS['update_list'].start() 536 537 signals = {'SIGTERM': interrupt, 'SIGINT': interrupt, 'SIGUSR1': clean_and_send, 'SIGUSR2': show_captured} 538 for sig in oldsignals: 539 if sig in signals: 540 signal.signal(signal.__dict__[sig], signals[sig]) 541 else: 542 try: 543 signal.signal(signal.__dict__[sig], oldsignals[sig]) 544 except: 545 continue 546 547 548 def clean_and_send(*args, **kwargs): 549 global glob, log 550 551 override_send_permission = get_var_value('override_send_permission', glob['config'], mode='bool') 552 553 if allow_send() or override_send_permission: 554 send_data(glob['LIST']) 555 else: 556 log.info('Sending not allowed when try to send results') 557 glob['LIST'] = {} 558 559 560 def get_mac(): 561 global log 562 563 dirmac = '/sys/class/net' 564 eth = 'eth0' 565 filemac = 'address' 566 file = '{}/{}/{}'.format(dirmac, eth, filemac) 567 uid = None 568 try: 569 with open(file, 'r') as fp: 570 uid = bin_to_ascii(fp.read()).strip() 59 571 except: 572 log.warning('Unable to read {}'.format(file)) 573 eth = sorted(os.listdir(dirmac)) 574 if len(eth) > 0: 575 eth = eth[0] 576 file = '{}/{}/{}'.format(dirmac, eth, filemac) 577 try: 578 with open(file, 'r') as fp: 579 uid = bin_to_ascii(fp.read()).strip() 580 except Exception as e: 581 log.error('Unable to read mac address, {}'.format(e)) 582 583 return str(uid) 584 585 586 def get_cpu(): 587 global log 588 589 file = '/proc/cpuinfo' 590 cpu = {} 591 try: 592 with open(file, 'r') as fp: 593 for line in fp: 594 if re.search('^processor\s+:\s+([0-9]+)$', line): 595 m = re.findall('^processor\s+:\s+([0-9]+)', line) 596 if m and len(m) > 0: 597 cpu['ncpus'] = int(m[0]) + 1 598 if re.search('^model name\s+:\s+(.+)$', line): 599 m = re.findall('^model name\s+:\s+(.+)$', line) 600 if m and len(m) > 0: 601 cpu['model'] = str(m[0]) 602 except Exception as e: 603 log.warning('Unable to read cpuinfo, {}'.format(e)) 604 cpu = None 605 return str(cpu) 606 607 608 def get_mem(): 609 global log 610 611 file = '/proc/meminfo' 612 mem = None 613 614 try: 615 with open(file, 'r') as fp: 616 for line in fp: 617 if re.search('^MemTotal:\s+([0-9]+)\s+\S+$', line): 618 m = re.findall('^MemTotal:\s+([0-9]+)\s+\S+$', line) 619 if m and len(m) > 0: 620 mem = int(m[0]) 621 break 622 except Exception as e: 623 log.warning('Unable to read meminfo, {}'.format(e)) 624 mem = None 625 return str(mem) 626 627 628 def get_vga(): 629 global log 630 631 vga = None 632 try: 633 out = bin_to_ascii(subprocess.check_output(['lspci'])).split('\n') 634 for line in out: 635 line_strip = line.strip() 636 if re.search('VGA', line_strip, re.IGNORECASE): 637 m = re.findall('^\S+\s(.+)$', line_strip) 638 if m and len(m) > 0: 639 vga = m[0] 640 break 641 except Exception as e: 642 log.warning('Unable to read pciinfo, {}'.format(e)) 643 vga = None 644 return str(vga) 645 646 647 def get_arch(): 648 global log 649 650 arch = None 651 try: 652 arch = bin_to_ascii(subprocess.check_output(['uname', '-m'])).strip() 653 except Exception as e: 654 log.warning('Unable to read architecture, {}'.format(e)) 655 arch = None 656 return str(arch) 657 658 659 def get_platform_data(): 660 global log 661 662 data = {} 663 data.setdefault('mac', get_mac()) 664 data.setdefault('cpu', get_cpu()) 665 data.setdefault('mem', get_mem()) 666 data.setdefault('vga', get_vga()) 667 data.setdefault('arch', get_arch()) 668 669 log.debug("Detected mac='{}' arch='{}' cpu='{}' mem='{}' vga='{}'".format(data['mac'], data['arch'], data['cpu'], data['mem'], data['vga'])) 670 return data 671 672 673 def send_data(data): 674 global log, glob 675 676 log.debug('sending specs {}'.format(glob['platform_data'])) 677 log.debug('sending data {}'.format(glob['LIST'])) 678 679 agent = glob['user_agent'] 680 url = 'http://' + glob['server'] + '/' + glob['server_path'] 681 headers = {'user-agent': agent} 682 683 version = glob['release'] 684 flavour = glob['flavour'] 685 686 list_data = data 687 try: 688 json_list_data = json.dumps(list_data) 689 except Exception as e: 690 log.error('Json error on internal data list') 691 return None 692 693 platform_data = glob['platform_data'] 694 uid = platform_data['mac'] 695 696 data_to_send = dict() 697 data_to_send.setdefault('uid', uid) 698 data_to_send.setdefault('vers', version) 699 data_to_send.setdefault('sab', flavour) 700 data_to_send.setdefault('specs', platform_data) 701 data_to_send.setdefault('stats', json_list_data) 702 703 try: 704 json_data_to_send = json.dumps(data_to_send) 705 except Exception as e: 706 log.error('Json error on data to send') 707 return None 708 709 payload = {'stats': json_data_to_send} 710 log.debug('Payload to send: {}'.format(payload)) 711 712 sent = False 713 rq = None 714 if glob['use_proxy']: 715 proxy_obj = dict() 716 proxy_obj.setdefault('http', glob['proxy']) 717 try: 718 rq = requests.post(url, data=payload, headers=headers, proxies=proxy_obj, timeout=5) 719 sent = True 720 except Exception as e: 721 log.error('Error sending data through proxy, {}'.format(e)) 722 723 if not glob['use_proxy'] or sent == False: 724 try: 725 rq = requests.post(url, data=payload, headers=headers, timeout=5) 726 sent = True 727 except Exception as e: 728 log.error('Error sending data, {}'.format(e)) 729 730 if sent and rq: 731 result = rq.text 732 result = result.strip().lower() 733 if result == 'ok': 734 log.debug('Sending was success with reply OK ') 735 elif result == 'nok': 736 log.info('Sending was success but reply is NOK ') 737 else: 738 log.warning("Sending was success but reply is unknown '{}'".format(result)) 739 else: 740 log.warning('Unable to send data') 741 742 743 def interrupt(*args, **kwargs): 744 global glob, log, THREADS 745 746 log.info('Interrupting analytics') 747 try: 748 clean_and_send() 749 try: 750 glob['TERMINATE'] = True 751 except: 752 log.error('Requested kill the program') 753 sys.exit(1) 754 for x in THREADS: 755 THREADS[x].join() 756 757 except Exception as e: 758 log.error('Error while interrupting, {}'.format(e)) 759 760 761 def show_captured(*args, **kwargs): 762 global glob, log 763 764 glob['PRINTING'] = True 765 log.info('Requested to show list') 766 767 list_items = glob['LIST'] 768 if not isinstance(list_items, dict): 769 log.warning('Error showing captured items, LIST is not a dictionary') 770 771 listkeys_sorted = sorted(list_items, key=list_items.get, reverse=True) 772 773 if len(listkeys_sorted) > 0: 774 log.info('analytics is showing currently capture list in memory') 775 for i in listkeys_sorted: 776 log.info('{} = {}'.format(i, list_items.get(i))) 777 else: 778 log.info('analytics detect an empty capture list in memory') 779 780 glob['PRINTING'] = False 781 782 783 def check_server_acknowledge(): 784 global log 785 786 try: 787 c = client.ServerProxy("https://server:9779/", 788 verbose=False, 789 use_datetime=True, 790 context=ssl._create_unverified_context()) 791 return c.get_variable("", "VariablesManager", "STATS_ENABLED") 792 except Exception as e: 793 log.error('Error getting variables, {}'.format(e)) 794 return None 795 796 797 def check_local_acknowledge(): 798 global glob, log 799 800 if glob['TERMINATE']: 801 return None 802 803 try: 804 statusfile = get_var_value('statusfile', glob['config']) 805 if str(statusfile) == 'None': 806 statusfile = '/etc/lliurex-analytics/status' 807 log.warning('Warning statusfile not set, defaulting to {}'.format(statusfile)) 808 except Exception as e: 809 log.error('Error getting value for statusfile, {}'.format(e)) 810 811 answer = None 812 try: 813 814 if os.path.isfile(statusfile): 815 fp = open(statusfile, 'r') 816 answer = fp.readline() 817 fp.close() 818 else: 819 log.error('wrong statusfile {}'.format(statusfile)) 820 return None 821 822 return answer.strip() 823 except Exception as e: 824 log.warning('Error reading status file, {}'.format(e)) 825 return None 826 827 828 def allow_send(): 829 global glob, log 830 831 if glob['TERMINATE']: 832 return False 833 834 if glob['flavour'].lower() == 'server': 835 answer = str(check_server_acknowledge()) 836 answer = answer.strip() 837 if answer == '1': 838 log.info('Allowed to send stats checking server acknowledge') 839 return True 840 elif answer == '0': 841 log.info('Denied to send stats checking server acknowledge') 842 return False 843 elif answer == 'None': 844 pass 845 else: 846 log.info('Unknown value checking server acknowledge, {}'.format(answer)) 847 answer = str(check_local_acknowledge()).lower() 848 answer = answer.strip() 849 if answer == 'yes': 850 log.info('Allowed to send stats checking local acknowledge') 851 return True 852 elif answer == 'no': 853 log.info('Denied to send stats checking local acknowledge') 854 return False 855 elif answer == '': 856 pass 857 else: 858 log.info('Unknown value checking local acknowledge, {}'.format(answer)) 859 860 log.info('Denied to send stats by default') 861 return False 862 863 864 if __name__ == '__main__': 865 866 try: 867 CONFIG = init_config() 868 except Exception as e: 869 print('Error initializing config analytics {}'.format(e), file=sys.stderr) 60 870 sys.exit(1) 871 try: 872 init_logging(CONFIG) 873 except Exception as e: 874 print('Error initializing logging analytics {}'.format(e), file=sys.stderr) 875 sys.exit(1) 876 877 THREADS = {} 878 879 DAEMON_MODE = get_var_value('DAEMON_MODE', CONFIG, 'bool') 880 881 if DAEMON_MODE: 882 daemonize(daemon_mode=True) 883 else: 884 start(daemon_mode=False) 885 886 log.debug('End main') 887 ended = False 888 while not ended: 889 for t in THREADS: 890 THREADS[t].join() 891 if THREADS[t].is_alive(): 892 ended = False 893 break 894 else: 895 ended = True 896 continue 897 898 log.info('Exitting analytics') 61 899 sys.exit(0) 62 #END def sig_handler(sig,frame):63 64 #65 # Forces change file when SIGUSR166 #67 def sig_handler_change(sig, frame):68 change_file();69 70 #71 # Forces change file and update server database when SIGUSR172 #73 def change_file():74 global LIST,filename,next_filename,LOGPATH75 # sync in-memory and temporary file data76 update_db()77 # clear in-memory list78 LIST.clear()79 # Check if permission flag is granted80 if allow_send():81 # Send data to master server82 send_data()83 else:84 debug("Sending not allowed when try to change file")85 # Get new temporary filename86 next_filename=get_filename(LOGPATH)87 # Sync data in-memory and temporary file data88 update_db()89 # Show curent data to stdout90 show_list()91 #END def change_file():92 93 def check_server_acknowledge():94 try:95 c = xmlrpclib.ServerProxy("https://server:9779/")96 return c.get_variable("","VariablesManager","STATS_ENABLED")97 except:98 return 'None'99 #END def check_server_acknowledge()100 101 def check_local_acknowledge():102 answer = None103 if os.path.isfile('/etc/lliurex-analytics/status'):104 fp = open('/etc/lliurex-analytics/status','r')105 answer = fp.readline();106 fp.close()107 return answer.strip()108 109 #END check_local_acknowledge110 111 def allow_send():112 global SABOR113 114 if SABOR.lower() != 'server':115 answer = check_server_acknowledge()116 if answer == '1':117 debug("allow_send stats by check server acknowledge")118 return True119 if answer == '0':120 debug("deny allow_send stats by check server acknowledge")121 return False122 123 answer = check_local_acknowledge()124 if answer == "yes":125 debug("allow_send stats by check local acknowledge")126 return True127 if answer == 'no':128 debug("deny allow_send by check local acknowledge")129 return False130 131 debug("deny allow_send stats by exception check server & local acknowledge")132 return False133 134 #END def allow_send()135 136 #137 # Sends data to server and clear all138 #139 def clean_and_send():140 global LIST141 # sync in-memory and temporary file data142 update_db()143 # clear in-memory list144 LIST.clear()145 if allow_send():146 # Send data to master server147 send_data()148 else:149 debug("Sending not allowed when try send file")150 # Clear database file151 clear_db()152 # Sync data in-memory and temporary file data153 update_db()154 #END def clean_and_send():155 156 #157 # Call show data when SIGINT2158 #159 def sig_handler_show(sig,frame):160 debug("current filename "+filename)161 show_list()162 show_list_db()163 global TIMEOUT164 global BDTIMEOUT165 global ctimeout166 global btimeout167 debug('TIMEOUT='+str(TIMEOUT)+',BDTIMEOUT='+str(BDTIMEOUT)+',ctimeout='+str(ctimeout)+',btimeout='+str(btimeout))168 #END def sig_handler_show(sig,frame):169 170 #171 # Global function called every second to run functions guided by timeout172 #173 def runner():174 global t175 global ctimeout176 global btimeout177 global TIMEOUT178 global BDTIMEOUT179 global RUNNER180 181 while RUNNER == 1:182 # Flush in-memory data to file db when expires timeout183 if ctimeout > TIMEOUT:184 update_db()185 ctimeout=0186 else:187 ctimeout+=1188 189 # Call send-data to server when expires dbtimeout190 if btimeout > BDTIMEOUT:191 clean_and_send();192 btimeout=0193 else:194 btimeout+=1195 debug('.',2)196 time.sleep(1.0)197 # Reload 1 sec timeout198 #t=threading.Timer(1.0,runner)199 # De-comment to allow stop threads inmediately200 # t.daemon = True201 #t.start()202 #END def runner():203 204 #205 # Add item to in-memory LIST206 #207 def add_item(item):208 global LIST,BLACKLIST,INTERPRETERS209 210 # Split binary and parameters211 #part=shlex.split(item)212 part=item.split(' ')213 # Check binary name is correct214 if ('/' in part[0]):215 part[0]=part[0].split('/')[-1]216 # Check if binary name must be recorded217 if (part[0] in BLACKLIST):218 return219 # Check if is a known interpreter220 if (part[0] in INTERPRETERS):221 part[0]=part[1]222 # Check script name is correct223 if ('/' in part[0]):224 part[0]=part[0].split('/')[-1]225 # Check if it's a parameter226 i=0227 novalid=['-',':']228 while (part[i][0] in novalid and i < len(part)):229 i+=1230 if (i < len(part)):231 part[0]=part[i]232 else:233 return234 if ('/' in part[0]):235 part[0]=part[0].split('/')[-1]236 237 # Check if binary name must be recorded238 if (part[0] in BLACKLIST):239 return240 241 # Add or append to global list242 if LIST.has_key(part[0]):243 debug('+',2)244 LIST[part[0]]+=1245 else:246 debug('*',2)247 LIST[part[0]]=1248 #END def add_item(item):249 250 #251 # Show in-memory LIST and print to stdout252 #253 def show_list():254 global LIST,daemon_mode,DEBUG_FILE255 if daemon_mode != True:256 print "Lista:"257 print "~~~~~~~~~~"258 for it in LIST:259 print it + "->" + str(LIST[it])260 else:261 fp_debug=open(DEBUG_FILE,"a")262 fp_debug.write("Lista:\n~~~~~~~\n");263 for it in LIST:264 fp_debug.write(it + "->" + str(LIST[it]) + "\n")265 fp_debug.close()266 #END def show_list():267 268 #269 # Send data from temporary db to server using http post method270 #271 def send_data():272 global SABOR273 global RELEASE274 275 # Calculates mac address276 f = open('/sys/class/net/eth0/address','r');277 uid=f.read().strip();278 f.close();279 280 global filename,url,proxy,headers,SEND_ITEMS;281 282 db = sqlite3.connect(filename);283 cur= db.cursor();284 try:285 cur.execute('select * from info order by n DESC limit {};'.format(SEND_ITEMS));286 json_tmp={};287 # Dumps temporary database building json object to send288 for x in cur.fetchall():289 json_tmp[x[0]]=str(x[1]);290 except:291 return;292 293 # Get version and flavour294 #p=subprocess.Popen(['lliurex-version','-n'],stdout=subprocess.PIPE)295 #out,err=p.communicate()296 #version=out.strip();297 version=RELEASE298 299 #p=subprocess.Popen(['lliurex-version','-f'],stdout=subprocess.PIPE)300 #out,err=p.communicate()301 #sabor=out.strip();302 sabor=SABOR303 304 # Send json encoded data to server305 jsonobj=json.dumps(json_tmp);306 jsonobj=json.dumps({'uid':uid,'vers':version,'sab':sabor, 'stats': jsonobj });307 payload = {'stats':jsonobj};308 try:309 debug("SENDING DATA:"+str(payload))310 except Exception as e:311 debug("Error debugging data to send")312 #debug('proxy set to '+proxy)313 p={}314 p['http']=proxy315 #debug('p='+str(p))316 try:317 if proxy != '':318 r = requests.post(url,data=payload,headers=headers,proxies=p,timeout=5)319 else:320 r = requests.post(url,data=payload,headers=headers,timeout=5)321 debug("S("+r.text+')',2);322 except Exception as e:323 debug("Error sending request: "+str(e))324 #END def send_data():325 326 #327 # Show list stored into db file328 #329 def show_list_db():330 global filename,daemon_mode,DEBUG_FILE,SEND_ITEMS331 db = sqlite3.connect(filename)332 cur= db.cursor()333 if daemon_mode != True:334 print "TOP {}:".format(SEND_ITEMS)335 print "~~~~~~~~~~"336 try:337 cur.execute('select * from info order by n DESC limit {};'.format(SEND_ITEMS))338 for x in cur.fetchall():339 print x[0] + " " + str(x[1]);340 except:341 print "No data available"342 else:343 fp_debug=open(DEBUG_FILE,"a")344 fp_debug.write("TOP 10:\n~~~~~~~~~~\n")345 try:346 cur.execute('select * from info order by n DESC limit {};'.format(SEND_ITEMS))347 for x in cur.fetchall():348 fp_debug.write(x[0] + " " + str(x[1])+"\n")349 except:350 fp_debug.write("No data available\n")351 fp_debug.close()352 db.close()353 #END def show_list_db():354 355 #356 # Calculates new name to store temporary database357 #358 def get_filename(LOGPATH=''):359 global filename360 l=[]361 if (LOGPATH != ''):362 LOGPATH=LOGPATH+'/'363 #Load all alphabet into list364 for k in list(string.ascii_lowercase):365 l.append(k)366 367 #Load all alphabet into list368 for k in list(string.ascii_lowercase):369 370 # Load all alphabet (with two characters) into list371 for p in list(string.ascii_lowercase):372 l.append(k+p)373 # l = {a,b,c...y,z,aa,ab...zy,zz)374 # Get current date375 today=date.today()376 377 current=LOGPATH+'stats-'+str(today)+'.db'378 ant=''379 # Get the next index for apply to file380 for x in l:381 if not os.path.isfile(current):382 filename=LOGPATH+'stats-'+str(today)+ant+'.db'383 return LOGPATH+'stats-'+str(today)+x+'.db'384 break;385 else:386 ant=x;387 current=LOGPATH+'stats-'+str(today)+ant+'.db'388 #END def get_filename():389 390 #391 # Clear file with stored database392 #393 def clear_db():394 global filename395 debug('C',2)396 db = sqlite3.connect(filename)397 cur = db.cursor()398 cur.execute('delete from info;')399 db.commit()400 db.close()401 #END def clear_db():402 403 #404 # Update temporary database and updates global LIST405 #406 def update_db():407 debug('U',1)408 global LIST,filename409 #debug("updating "+filename);410 # Temporary list411 L2={}412 # Open file to store temporary exec ocurrences413 db = sqlite3.connect(filename)414 cur= db.cursor()415 try:416 cur.execute('select * from info;')417 except:418 cur.execute('create table info (cmd varchar(100) primary key,n int(10));');419 cur.execute('select * from info;')420 # Load into L2 all database records421 for x in cur.fetchall():422 L2[x[0]]=x[1]423 # Append to LIST all previously added database records424 for x in L2:425 if not LIST.has_key(x):426 LIST[x]=L2[x]427 # Clear database records428 cur.execute('delete from info;')429 values=[]430 # Write all records from LIST into file431 for x in LIST:432 values.append((x,str(LIST[x])))433 cur.executemany('insert into info values (?,?)',values)434 db.commit()435 db.close()436 #END def update_db():437 438 def daemonize():439 debug('Running daemon...')440 try:441 with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analytics')):442 start()443 except Exception as e:444 debug("Error daemonizing "+str(e))445 #END def daemonize():446 447 def start():448 fp = open('/var/run/analytics.pid','w');449 fp.write(str(os.getpid()))450 fp.close()451 # Add handlers for functions, close, show, change file / update db452 signal.signal(signal.SIGINT,sig_handler)453 signal.signal(signal.SIGUSR1,sig_handler_change)454 signal.signal(signal.SIGUSR2,sig_handler_show)455 # Sync data456 update_db()457 458 t=threading.Timer(1.0,runner)459 # De-comment to allow stop threads inmediately460 #t.daemon = True461 t.start()462 463 p1= subprocess.Popen(["tail","-F",logfilename],stdout=subprocess.PIPE)464 465 466 while True:467 line = p1.stdout.readline()468 if(re.search('type=EXECVE',line)):469 m=re.findall('a[0-9]+="([^"]+)"',line)470 t=""471 for i in m:472 t+=i + " "473 add_item(t)474 #END def start():475 476 ###################477 ###### MAIN #######478 ###################479 480 # enable / disable show more running data481 DEBUG=0482 # Global list in mem to store results483 LIST={}484 TIMEOUT=300485 BDTIMEOUT=300486 SEND_ITEMS=100487 # Counters for threads going to timeout488 ctimeout=0489 btimeout=0490 RUNNER=1491 492 # Get configuration parameters493 config = ConfigParser.ConfigParser()494 config.read('/etc/lliurex-analytics/agent.cfg')495 496 LOGPATH = config.get('Agent','logpath');497 DEBUG_FILE=LOGPATH+"/analytics-debug.log"498 # Filename for temporally files in disk (sqlite)499 filename='' #initializes as global when is called get_filename500 next_filename=get_filename(LOGPATH)501 502 server = config.get('Server','server');503 server_path = config.get('Server','server-path');504 505 # Max timeout commiting changes to file506 TIMEOUT = config.getint('Agent','timeout')507 508 # Max timeout commiting changes from file to database509 BDTIMEOUT = config.getint('Agent','bdtimeout')510 511 agent = config.get('Agent','user-agent')512 headers = {'user-agent': agent};513 url="http://"+server+"/"+server_path;514 515 daemon_mode = config.getboolean('Agent','daemon')516 logfilename = config.get('Audit','file')517 518 blacklistname = config.get('Audit','blacklist')519 520 BLACKLIST = [line.rstrip('\n') for line in open(blacklistname)]521 INTERPRETERS = config.get('Audit','interpreters').split(',')522 523 debug("filename="+filename+' next='+next_filename)524 525 p=subprocess.Popen(['lliurex-version','-n'],stdout=subprocess.PIPE)526 RELEASE,err=p.communicate()527 RELEASE=RELEASE.strip()528 if RELEASE[0:2] == '15':529 VERSION_GETTER='lliurex-detect'530 else:531 VERSION_GETTER='lliurex-version'532 533 534 p=subprocess.Popen([VERSION_GETTER,'-f'],stdout=subprocess.PIPE)535 SABOR,err=p.communicate()536 SABOR=SABOR.strip()537 538 debug('Starting on '+SABOR+' '+RELEASE)539 try:540 px= subprocess.Popen(["bash","-c","source /etc/profile && echo $http_proxy"],stdout=subprocess.PIPE)541 proxy=px.stdout.readline().strip()542 if str(proxy) != '':543 debug('Detected proxy: '+str(proxy))544 except Exception as e:545 debug('Error getting proxy '+str(e))546 547 if __name__ == '__main__':548 if (logfilename == 'stdin'):549 if (len(sys.argv) < 2):550 sys.stderr.write('Error\n')551 sys.exit(1)552 else:553 logfilename=sys.argv[1]554 else:555 if daemon_mode == True:556 daemonize()557 else:558 start()559 else:560 if (logfilename == 'stdin'):561 sys.stderr.write('file=stdin not valid running as module\n')562 sys.exit(1)563
Note: See TracChangeset
for help on using the changeset viewer.