1 | #!/usr/bin/env python3 |
---|
2 | |
---|
3 | import time |
---|
4 | #import threading |
---|
5 | #from multiprocessing.dummy import Pool as ThreadPool |
---|
6 | from multiprocessing.dummy import Process,Lock |
---|
7 | #import multiprocessing |
---|
8 | import sys |
---|
9 | import MySQLdb as mdb |
---|
10 | import signal |
---|
11 | import os |
---|
12 | import queue |
---|
13 | import hashlib |
---|
14 | import gc |
---|
15 | import os |
---|
16 | import psutil |
---|
17 | from functools import wraps |
---|
18 | import re |
---|
19 | import daemon |
---|
20 | import lockfile |
---|
21 | import logging |
---|
22 | import logging.handlers |
---|
23 | from logging import config |
---|
24 | from functools import reduce |
---|
25 | import traceback |
---|
26 | |
---|
27 | ##### START EDITABLE VARS ##### |
---|
28 | |
---|
29 | DEBUG=False |
---|
30 | MIN_LOG_LEVEL=logging.INFO |
---|
31 | DAEMON=True |
---|
32 | FILE='/usr/lib/analytics-server/analytics/config.php' |
---|
33 | LIMIT=70.0 # PERCENTAGE LOAD LIMIT |
---|
34 | |
---|
35 | ##### END EDITABLE VARS ##### |
---|
36 | |
---|
37 | DEBUG_PRINT_ALIVE_COUNTER=60 |
---|
38 | THREADED=True |
---|
39 | MAX_SLOW_CHECK_CLIENTS = 30 |
---|
40 | |
---|
41 | if DEBUG: |
---|
42 | if MIN_LOG_LEVEL: |
---|
43 | loglevel=MIN_LOG_LEVEL |
---|
44 | else: |
---|
45 | loglevel=logging.DEBUG |
---|
46 | else: |
---|
47 | loglevel=logging.INFO |
---|
48 | |
---|
49 | LOGGING = { |
---|
50 | 'version': 1, |
---|
51 | 'disable_existing_loggers': False, |
---|
52 | 'formatters': { |
---|
53 | 'verbose': { |
---|
54 | 'format': '%(levelname)s %(module)s %(message)s' |
---|
55 | #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s' |
---|
56 | }, |
---|
57 | }, |
---|
58 | 'handlers': { |
---|
59 | 'stdout': { |
---|
60 | 'class': 'logging.StreamHandler', |
---|
61 | 'stream': sys.stdout, |
---|
62 | 'formatter': 'verbose', |
---|
63 | }, |
---|
64 | 'sys-logger6': { |
---|
65 | 'class': 'logging.handlers.SysLogHandler', |
---|
66 | 'address': '/dev/log', |
---|
67 | 'facility': "local6", |
---|
68 | 'formatter': 'verbose', |
---|
69 | }, |
---|
70 | }, |
---|
71 | 'loggers': { |
---|
72 | 'analyticsd-logger': { |
---|
73 | 'handlers': ['sys-logger6','stdout'], |
---|
74 | 'level': loglevel, |
---|
75 | 'propagate': True, |
---|
76 | }, |
---|
77 | } |
---|
78 | } |
---|
79 | |
---|
80 | def to_str(x): |
---|
81 | if isinstance(x,str): |
---|
82 | return to_utf(x).decode('unicode_escape') |
---|
83 | else: |
---|
84 | return x |
---|
85 | |
---|
86 | def to_utf(x): |
---|
87 | if isinstance(x,str): |
---|
88 | return x.encode('utf-8') |
---|
89 | else: |
---|
90 | return x |
---|
91 | |
---|
92 | def printea(msg="",level='critical'): |
---|
93 | if level == 'critical': |
---|
94 | logger.critical(msg) |
---|
95 | elif level == 'error': |
---|
96 | logger.error(msg) |
---|
97 | elif level == 'warning': |
---|
98 | logger.warning(msg) |
---|
99 | elif level == 'info': |
---|
100 | logger.info(msg) |
---|
101 | else: |
---|
102 | logger.debug(msg) |
---|
103 | |
---|
104 | def keepalive(who=""): |
---|
105 | global DEBUG_PRINT_ALIVE_COUNTER |
---|
106 | t=str(int(time.time())) |
---|
107 | if DEBUG: |
---|
108 | pass |
---|
109 | # too much verbose |
---|
110 | # printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug') |
---|
111 | else: |
---|
112 | if who == 'main': |
---|
113 | if DEBUG_PRINT_ALIVE_COUNTER > 0: |
---|
114 | DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1 |
---|
115 | else: |
---|
116 | DEBUG_PRINT_ALIVE_COUNTER=60 |
---|
117 | printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info') |
---|
118 | |
---|
119 | fp = open('/var/run/analyticsd.keepalive','w'); |
---|
120 | fp.write(t) |
---|
121 | fp.close() |
---|
122 | |
---|
123 | config.dictConfig(LOGGING) |
---|
124 | logger = logging.getLogger('analyticsd-logger') |
---|
125 | |
---|
126 | DBNAME=None |
---|
127 | USER=None |
---|
128 | PASS=None |
---|
129 | IP=None |
---|
130 | |
---|
131 | EMPTY_PAUSED_SLEEP=1 |
---|
132 | CHECK_LOAD_TIME=5 |
---|
133 | MAX_RETRIES=5 |
---|
134 | TIMED_ON=False |
---|
135 | |
---|
136 | try: |
---|
137 | with open(FILE,'r') as f: |
---|
138 | for line in f: |
---|
139 | if not IP: |
---|
140 | IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line) |
---|
141 | if IP: |
---|
142 | IP = IP.group(1) |
---|
143 | if not DBNAME: |
---|
144 | DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line) |
---|
145 | if DBNAME: |
---|
146 | DBNAME = DBNAME.group(1) |
---|
147 | if not USER: |
---|
148 | USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line) |
---|
149 | if USER: |
---|
150 | USER = USER.group(1) |
---|
151 | if not PASS: |
---|
152 | PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line) |
---|
153 | if PASS: |
---|
154 | PASS = PASS.group(1) |
---|
155 | if not (IP or DBNAME or USER or PASS): |
---|
156 | printea("Couldn't get database configuration from {}".format(FILE)) |
---|
157 | else: |
---|
158 | printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug') |
---|
159 | except Exception as e: |
---|
160 | printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e))) |
---|
161 | sys.exit(1) |
---|
162 | |
---|
163 | if not (IP or DBNAME or USER or PASS): |
---|
164 | printea("Couldn't get database configuration from {}".format(FILE)) |
---|
165 | sys.exit(1) |
---|
166 | |
---|
167 | if THREADED: |
---|
168 | THREADS=['main','server','client','desktop','other'] |
---|
169 | else: |
---|
170 | THREADS=['main'] |
---|
171 | |
---|
172 | class DB(): |
---|
173 | def __init__(self,mon,t='main'): |
---|
174 | self.t=t |
---|
175 | self.reconnect=0 |
---|
176 | self.empty = False |
---|
177 | self.conn=None |
---|
178 | self.mon=mon |
---|
179 | self.q=queue.Queue() |
---|
180 | self.processed=0 |
---|
181 | self.need_clean = False |
---|
182 | printea('Database worker {} initialized'.format(t),'info') |
---|
183 | |
---|
184 | def timed(func): |
---|
185 | @wraps(func) |
---|
186 | def wrapper(*args,**kwargs): |
---|
187 | if TIMED_ON: |
---|
188 | printea("Start({}): @{}".format(func.__name__,time.time()),'debug') |
---|
189 | ret=func(*args,**kwargs) |
---|
190 | if TIMED_ON: |
---|
191 | printea("End ({}): @{}".format(func.__name__,time.time()),'debug') |
---|
192 | return ret |
---|
193 | return wrapper |
---|
194 | |
---|
195 | def with_retry(func): |
---|
196 | @wraps(func) |
---|
197 | def wrapper(*args,**kwargs): |
---|
198 | if 'retry' not in kwargs: |
---|
199 | kwargs['retry']=1 |
---|
200 | if 'mon' not in kwargs: |
---|
201 | kwargs['mon'] = None |
---|
202 | try: |
---|
203 | return func(*args,**kwargs) |
---|
204 | except Exception as e: |
---|
205 | printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e))) |
---|
206 | if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate): |
---|
207 | printea("Fatal error in ({}), max retries exceded".format(func.__name__)) |
---|
208 | if kwargs['mon']: |
---|
209 | kwargs['mon'].term() |
---|
210 | sys.exit(1) |
---|
211 | return None |
---|
212 | else: |
---|
213 | time.sleep(kwargs['retry']**2) |
---|
214 | kwargs['retry']+=1 |
---|
215 | return wrapper(*args,**kwargs) |
---|
216 | return result |
---|
217 | return wrapper |
---|
218 | |
---|
219 | def with_debug(func): |
---|
220 | @wraps(func) |
---|
221 | def wrapper(*args,**kwargs): |
---|
222 | if 'query' in kwargs: |
---|
223 | printea("executing query: {}".format(kwargs['query']),'debug') |
---|
224 | return func(*args,**kwargs) |
---|
225 | return wrapper |
---|
226 | |
---|
227 | @with_debug |
---|
228 | def execute(self,*args,**kwargs): |
---|
229 | if 'query' not in kwargs: |
---|
230 | printea("Warning execute called whithout query",'info') |
---|
231 | return None |
---|
232 | try: |
---|
233 | return self.cur.execute(kwargs['query']) |
---|
234 | except Exception as e: |
---|
235 | raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query'])) |
---|
236 | |
---|
237 | def init_db(self): |
---|
238 | try: |
---|
239 | self.conn = mdb.connect(IP,USER,PASS,DBNAME) |
---|
240 | self.conn.autocommit(False) |
---|
241 | self.cur = self.conn.cursor() |
---|
242 | self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED") |
---|
243 | variables=self.check_server_variables() |
---|
244 | |
---|
245 | if variables: |
---|
246 | self.mon.server_variables=variables |
---|
247 | |
---|
248 | printea("Connected succesfully {}".format(self.t),'info') |
---|
249 | |
---|
250 | except mdb.Error as e: |
---|
251 | printea("Error {}: {}".format(e.args[0],e.args[1])) |
---|
252 | raise Exception(e) |
---|
253 | |
---|
254 | def check_server_variables(self): |
---|
255 | if self.t != 'main': |
---|
256 | return None |
---|
257 | else: |
---|
258 | printea('Getting server variables','info') |
---|
259 | result = {} |
---|
260 | self.cur.execute("show global variables") |
---|
261 | printea('Setting {} vars'.format(self.cur.rowcount),'debug') |
---|
262 | if self.cur.rowcount > 0: |
---|
263 | for i in range(self.cur.rowcount): |
---|
264 | var_name, var_value = self.cur.fetchone() |
---|
265 | result.setdefault(var_name,var_value) |
---|
266 | return result |
---|
267 | |
---|
268 | def get_config(self): |
---|
269 | values={} |
---|
270 | self.cur.execute('select name,value from Config') |
---|
271 | for val in self.cur.fetchall(): |
---|
272 | values.setdefault(val[0],val[1]) |
---|
273 | return values |
---|
274 | |
---|
275 | def put_config(self,values): |
---|
276 | vals=[] |
---|
277 | for x in values.keys(): |
---|
278 | vals.append("('{}','{}')".format(x,values[x])) |
---|
279 | try: |
---|
280 | self.cur.execute("insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals))) |
---|
281 | self.conn.commit() |
---|
282 | except Exception as e: |
---|
283 | print(e) |
---|
284 | |
---|
285 | def check_temporary_tables_size(self,table_name): |
---|
286 | if self.t != 'load': |
---|
287 | return None |
---|
288 | else: |
---|
289 | size = 0 |
---|
290 | rows = 0 |
---|
291 | self.cur.execute('select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name)) |
---|
292 | |
---|
293 | res=self.cur.fetchone() |
---|
294 | size = float(res[0]) |
---|
295 | rows = int(res[1]) |
---|
296 | return (int(size),rows) |
---|
297 | |
---|
298 | def close_db(self): |
---|
299 | if self.conn: |
---|
300 | self.conn.close() |
---|
301 | printea("Closed connection {}".format(self.t),'info') |
---|
302 | |
---|
303 | def reduce_flavour(self,version,flavour): |
---|
304 | if version == '15': |
---|
305 | if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour: |
---|
306 | return 'desktop' |
---|
307 | elif 'server' in flavour: |
---|
308 | return 'server' |
---|
309 | elif 'client' in flavour: |
---|
310 | return 'client' |
---|
311 | elif version == '16': |
---|
312 | if 'server' in flavour: |
---|
313 | return 'server' |
---|
314 | elif 'client' in flavour: |
---|
315 | return 'client' |
---|
316 | elif 'desktop' in flavour: |
---|
317 | return 'desktop' |
---|
318 | return 'other' |
---|
319 | |
---|
320 | def reduce_version(self,version): |
---|
321 | if version[0:2] in ['15','16']: |
---|
322 | return version[0:2] |
---|
323 | else: |
---|
324 | return 'other' |
---|
325 | |
---|
326 | def gen_uuid(self,*args,**kwargs): |
---|
327 | try: |
---|
328 | string=to_utf(u'-'.join([str(x) for x in args])) |
---|
329 | h=hashlib.sha1(string) |
---|
330 | return int(h.hexdigest()[0:16],16) |
---|
331 | except Exception as e: |
---|
332 | print(traceback.format_exc()) |
---|
333 | |
---|
334 | @timed |
---|
335 | @with_retry |
---|
336 | def get_client(self,*args,**kwargs): |
---|
337 | try: |
---|
338 | query="SELECT id,date,user,version,sabor,arch,mem,vga,cpu,ncpu from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window)) |
---|
339 | self.execute(query=query) |
---|
340 | ret =[] |
---|
341 | if self.cur.rowcount > 0: |
---|
342 | for i in range(self.cur.rowcount): |
---|
343 | v_id,v_date,v_user,v_version,v_flavour,v_arch,v_mem,v_vga,v_cpu,v_ncpu=self.cur.fetchone() |
---|
344 | version=self.reduce_version(v_version) |
---|
345 | flavour=self.reduce_flavour(version,v_flavour) |
---|
346 | uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour) |
---|
347 | if not v_arch: |
---|
348 | v_arch = 'NULL' |
---|
349 | if not v_mem: |
---|
350 | v_mem = 'NULL' |
---|
351 | if not v_vga: |
---|
352 | v_vga = 'NULL' |
---|
353 | if not v_cpu: |
---|
354 | v_cpu = 'NULL' |
---|
355 | if not v_ncpu: |
---|
356 | v_ncpu = 'NULL' |
---|
357 | ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour,'arch':v_arch,'mem':v_mem,'vga':v_vga,'cpu':v_cpu,'ncpu':v_ncpu}) |
---|
358 | return ret |
---|
359 | else: |
---|
360 | return True |
---|
361 | except Exception as e: |
---|
362 | raise Exception("Error getting client: {}".format(e)) |
---|
363 | |
---|
364 | @timed |
---|
365 | @with_retry |
---|
366 | def get_apps(self,*args,**kwargs): |
---|
367 | if 'clients' not in kwargs: |
---|
368 | printea("Warning executed without named parameter clients",'info') |
---|
369 | return None |
---|
370 | ret = {} |
---|
371 | try: |
---|
372 | cids_to_rel_fla={} |
---|
373 | for c in kwargs['clients']: |
---|
374 | if c['id'] not in cids_to_rel_fla: |
---|
375 | cids_to_rel_fla[c['id']]=(c['version'],c['flavour']) |
---|
376 | if c['flavour'] not in ret: |
---|
377 | ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None} |
---|
378 | for c in kwargs['clients']: |
---|
379 | ret[c['flavour']]['clients'].append(c) |
---|
380 | |
---|
381 | query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys()))) |
---|
382 | self.execute(query=query) |
---|
383 | if self.cur.rowcount > 0: |
---|
384 | for i in range(self.cur.rowcount): |
---|
385 | row = self.cur.fetchone() |
---|
386 | clid = row[0] |
---|
387 | rel,fla = cids_to_rel_fla[clid] |
---|
388 | uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2]) |
---|
389 | ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]}) |
---|
390 | return ret |
---|
391 | except Exception as e: |
---|
392 | raise Exception("Error getting apps: {}".format(e)) |
---|
393 | |
---|
394 | @timed |
---|
395 | @with_retry |
---|
396 | def put_client(self,*args,**kwargs): |
---|
397 | if 'client_list' not in kwargs: |
---|
398 | raise Exception("Called without named parameter client_list") |
---|
399 | values= [] |
---|
400 | for cli in kwargs['client_list']: |
---|
401 | values.append("({},'{}','{}','{}','{}','{}','{}','{}',{},'{}','{}',{})".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour'],cli['arch'],cli['mem'],cli['vga'],cli['cpu'],cli['ncpu'])) |
---|
402 | query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`,`arch`,`mem`,`vga`,`cpu`,`ncpu`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values))) |
---|
403 | self.execute(query=query) |
---|
404 | return True |
---|
405 | |
---|
406 | @timed |
---|
407 | @with_retry |
---|
408 | def put_apps(self,*args,**kwargs): |
---|
409 | if 'apps' not in kwargs: |
---|
410 | raise Exception("Called without named parameter apps") |
---|
411 | app_list = {} |
---|
412 | for app in kwargs['apps']: |
---|
413 | #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app']) |
---|
414 | if str(app['uuid']) not in app_list: |
---|
415 | app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']} |
---|
416 | else: |
---|
417 | app_list[str(app['uuid'])]['value']+=app['value'] |
---|
418 | values = [] |
---|
419 | for app in app_list: |
---|
420 | item=app_list[app] |
---|
421 | values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value'])) |
---|
422 | query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values))) |
---|
423 | self.execute(query=query) |
---|
424 | return True |
---|
425 | |
---|
426 | @timed |
---|
427 | @with_retry |
---|
428 | def del_client(self,*args,**kwargs): |
---|
429 | if 'client_list' not in kwargs: |
---|
430 | raise Exception("Called without named parameter client_list") |
---|
431 | query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list']))) |
---|
432 | self.execute(query=query) |
---|
433 | return True |
---|
434 | |
---|
435 | @timed |
---|
436 | @with_retry |
---|
437 | def del_apps(self,*args,**kwargs): |
---|
438 | if 'client_list' not in kwargs: |
---|
439 | raise Exception("Called without named parameter client_list") |
---|
440 | query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list']))) |
---|
441 | self.execute(query=query) |
---|
442 | return True |
---|
443 | |
---|
444 | @timed |
---|
445 | def reset_autoinc(self): |
---|
446 | try: |
---|
447 | query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME) |
---|
448 | self.execute(query=query) |
---|
449 | ainc = self.cur.fetchone()[0] |
---|
450 | #query = "SELECT count(*) from tmp_clients" |
---|
451 | #self.execute(query=query) |
---|
452 | #cli_size=self.cur.fetchone()[0] |
---|
453 | #query = "SELECT count(*) from tmp_packages" |
---|
454 | #self.execute(query=query) |
---|
455 | #pkg_size=self.cur.fetchone()[0] |
---|
456 | #printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug') |
---|
457 | if ainc > 65500 and self.mon.all_queues_empty(): |
---|
458 | #and cli_size == 0 and pkg_size == 0: |
---|
459 | query = "TRUNCATE TABLE tmp_clients" |
---|
460 | self.execute(query=query) |
---|
461 | query = "TRUNCATE TABLE tmp_packages" |
---|
462 | self.execute(query=query) |
---|
463 | return True |
---|
464 | except Exception as e: |
---|
465 | raise Exception("Error reseting auto_increment: {}".format(e)) |
---|
466 | |
---|
467 | |
---|
468 | @timed |
---|
469 | def process_main_thread(self,*args,**kwargs): |
---|
470 | if self.mon.slow_check_clients > 0 : |
---|
471 | self.mon.slow_check_clients -= 1 |
---|
472 | time.sleep(1) |
---|
473 | return True |
---|
474 | else: |
---|
475 | self.mon.slow_check_clients = self.mon.default_slow_check_clients |
---|
476 | |
---|
477 | clis=self.get_client(mon=self.mon) |
---|
478 | if clis == True: #No clients found (empty) |
---|
479 | self.empty = True # if > 65500 auto_increment was reset |
---|
480 | self.mon.schedule(event='NO_MORE_CLIENTS') |
---|
481 | return False |
---|
482 | #clients found |
---|
483 | self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis)) |
---|
484 | # if clients found get apps |
---|
485 | lapps = self.get_apps(clients=clis,mon=self.mon) |
---|
486 | #lapps can be empty |
---|
487 | self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term) |
---|
488 | #If deletion was failed , thread was died |
---|
489 | lapps_tmp={'apps':[],'clients':[]} |
---|
490 | for fla in lapps: |
---|
491 | if THREADED: |
---|
492 | lapps[fla]['timestamp']=time.time() |
---|
493 | self.mon.db[fla].q.put(lapps[fla],True) |
---|
494 | else: |
---|
495 | lapps_tmp['apps'].extend(lapps[fla]['apps']) |
---|
496 | lapps_tmp['clients'].extend(lapps[fla]['clients']) |
---|
497 | self.mon.db['main'].q.put(lapps_tmp,True) |
---|
498 | #if DEBUG: |
---|
499 | self.processed+=len(clis) |
---|
500 | return True |
---|
501 | |
---|
502 | @timed |
---|
503 | def process_all_threads(self,*args,**kwargs): |
---|
504 | lapps=self.q.get(True) |
---|
505 | #print "Running {}".format(self.t) |
---|
506 | if THREADED: |
---|
507 | while (lapps['timestamp'] > self.mon.commited): |
---|
508 | time.sleep(0.001) |
---|
509 | if len(lapps['clients']) != 0: |
---|
510 | printea('Thread {} putting client'.format(self.t),'debug') |
---|
511 | #IF FAIL, AFTER RETRIES THREAD DIES |
---|
512 | if not self.put_client(client_list=lapps['clients'],mon=self.mon): |
---|
513 | self.q.put(lapps,True) # USELESS |
---|
514 | return False # USELESS |
---|
515 | if len(lapps['apps']) != 0: |
---|
516 | printea('Thread {} putting clientapps'.format(self.t),'debug') |
---|
517 | if not (self.put_apps(apps=lapps['apps'],mon=self.mon)): |
---|
518 | self.q.put(lapps,True) # USELESS |
---|
519 | return False # USELESS |
---|
520 | if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon): |
---|
521 | self.q.put(lapps,True) # USELESS |
---|
522 | return False # USELESS |
---|
523 | #if DEBUG: |
---|
524 | self.processed+=len(lapps['clients']) |
---|
525 | return True |
---|
526 | |
---|
527 | def process(self,*args,**kwargs): |
---|
528 | keepalive(self.t) |
---|
529 | # warning too much verbose, printea("Running thread {}".format(self.t),'debug') |
---|
530 | if self.t == 'main' and not self.mon.terminate: |
---|
531 | ret=self.process_main_thread(*args,**kwargs) |
---|
532 | if ret == False: #No more clients available |
---|
533 | return True |
---|
534 | if ret == True: #Clients was put on queues, need process more without waiting, main queue always empty |
---|
535 | if THREADED: |
---|
536 | return True #Need return to avoid put empty flag and wait into main thread |
---|
537 | |
---|
538 | #after this poing, code for all threads |
---|
539 | if not self.q.empty(): |
---|
540 | ret=self.process_all_threads(*args,**kwargs) |
---|
541 | if ret == False: # USELESS? THREADS was died |
---|
542 | printea("Error threads") |
---|
543 | return ret |
---|
544 | else: |
---|
545 | del self.q |
---|
546 | self.q = queue.Queue() |
---|
547 | self.empty = True |
---|
548 | return True |
---|
549 | |
---|
550 | def worker(self): |
---|
551 | printea("Starting worker {} processing".format(self.t),'info') |
---|
552 | while not (self.mon.terminate and self.empty): |
---|
553 | if self.mon.paused or self.empty: |
---|
554 | #if self.empty and self.t == 'main': |
---|
555 | # self.reset_autoinc() |
---|
556 | if self.mon.paused: |
---|
557 | printea("Paused by high load {}".format(self.t),'debug') |
---|
558 | # Too much verbose |
---|
559 | #if self.empty: |
---|
560 | # printea("Empty queue {} sleeping by now".format(self.t),'debug') |
---|
561 | if self.empty: |
---|
562 | self.empty = False |
---|
563 | time.sleep(EMPTY_PAUSED_SLEEP) |
---|
564 | else: |
---|
565 | try: |
---|
566 | if self.conn == None: |
---|
567 | raise EnvironmentError('Connection not available, probably it\'s a first run') |
---|
568 | else: |
---|
569 | self.conn.begin() |
---|
570 | if self.process(): |
---|
571 | self.conn.commit() |
---|
572 | self.mon.commited=time.time() |
---|
573 | self.reconnect = 0 |
---|
574 | if self.need_clean: |
---|
575 | gc.collect() |
---|
576 | else: |
---|
577 | self.conn.rollback() |
---|
578 | except EnvironmentError as e: |
---|
579 | printea(e) |
---|
580 | self.init_db() |
---|
581 | |
---|
582 | except Exception as e: |
---|
583 | try: |
---|
584 | if self.conn != None: |
---|
585 | self.conn.rollback() |
---|
586 | except: |
---|
587 | printea("Can't rollback last actions",'info') |
---|
588 | pass |
---|
589 | #if e[0] != 2006: |
---|
590 | # printea("Exception processing worker({}): {}".format(self.t,e)) |
---|
591 | #if e[0] == 2006: # SERVER GONE AWAY |
---|
592 | # printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e)) |
---|
593 | |
---|
594 | printea("Trying to recover connection ({}), {}".format(self.t,e)) |
---|
595 | if self.reconnect == 100: |
---|
596 | printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t)) |
---|
597 | self.mon.term() |
---|
598 | else: |
---|
599 | self.reconnect+=1 |
---|
600 | printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect)) |
---|
601 | time.sleep(self.reconnect*self.reconnect) |
---|
602 | |
---|
603 | try: |
---|
604 | self.init_db() |
---|
605 | printea("Recovered worker {} connection".format(self.t),'info') |
---|
606 | except: |
---|
607 | printea('Unable to initialize worker {}'.format(self.t)) |
---|
608 | pass |
---|
609 | |
---|
610 | |
---|
611 | class Monitor(): |
---|
612 | def __init__(self): |
---|
613 | self.MAX_QUEUE_UTILIZATION = 100 |
---|
614 | self.USE_MAX_QUEUES = True |
---|
615 | self.MAX_SELECT_WINDOW = (2 ** 13) +1 |
---|
616 | self.MIN_SELECT_WINDOW = 32 |
---|
617 | self.MEM_USED=0 |
---|
618 | self.MAX_MEM=512 |
---|
619 | self.MIN_FREE_MEM_SERVER=100 |
---|
620 | self.USE_MAX_MEM=True |
---|
621 | self.lock = Lock() |
---|
622 | self.terminate = False |
---|
623 | self.finished = False |
---|
624 | self.paused = False |
---|
625 | self.select_window = self.MIN_SELECT_WINDOW |
---|
626 | self.commited = time.time() |
---|
627 | self.procesed = 0 |
---|
628 | self.procesed_per_sec = [0]*10 |
---|
629 | self.load = 0 |
---|
630 | self.server_variables = None # initialized by main worker |
---|
631 | self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 } |
---|
632 | self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 } |
---|
633 | self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 } |
---|
634 | self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 } |
---|
635 | self.default_slow_check_clients = 1 |
---|
636 | self.slow_check_clients = self.default_slow_check_clients |
---|
637 | self.server_mem = 0 |
---|
638 | self.loadlist = [ 0.0 ] * 100 |
---|
639 | self.max_heap = None |
---|
640 | |
---|
641 | signal.signal(signal.SIGQUIT,self.term) |
---|
642 | signal.signal(signal.SIGTERM,self.term) |
---|
643 | signal.signal(signal.SIGINT,self.term) |
---|
644 | |
---|
645 | self.db = {} |
---|
646 | self.threads = {} |
---|
647 | for x in THREADS: |
---|
648 | self.db[x] = DB(self,x) |
---|
649 | #try: |
---|
650 | # self.db[x].init_db() |
---|
651 | #except Exception as e: |
---|
652 | # printea('Error initializing database connections: {}'.format(str(e))) |
---|
653 | # sys.exit(1) |
---|
654 | self.cfg= None |
---|
655 | printea("Monitor initialized with {} threads".format(len(THREADS)),'info') |
---|
656 | |
---|
657 | def windowctl(self, *args, **kwargs): |
---|
658 | if args[0] == '+': |
---|
659 | if self.select_window*2 < self.MAX_SELECT_WINDOW: |
---|
660 | self.select_window*=2 |
---|
661 | self.select_window=int(self.select_window) |
---|
662 | if args[0] == '-': |
---|
663 | if self.select_window > self.MIN_SELECT_WINDOW: |
---|
664 | self.select_window/=2 |
---|
665 | self.select_window=int(self.select_window) |
---|
666 | |
---|
667 | def slowcheckctl(self, *args, **kwargs): |
---|
668 | if args[0] == 'reset': |
---|
669 | self.default_slow_check_clients = 0 |
---|
670 | if args[0] == '+': |
---|
671 | if self.default_slow_check_clients < MAX_SLOW_CHECK_CLIENTS: |
---|
672 | if self.default_slow_check_clients == 0: |
---|
673 | self.default_slow_check_clients = 1 |
---|
674 | else: |
---|
675 | self.default_slow_check_clients = self.default_slow_check_clients * 2 |
---|
676 | |
---|
677 | |
---|
678 | def schedule(self, *args, **kwargs): |
---|
679 | if kwargs['event'] == 'NO_MORE_CLIENTS': |
---|
680 | self.windowctl('-') |
---|
681 | self.slowcheckctl('+') |
---|
682 | |
---|
683 | if kwargs['event'] == 'HAVE_CLIENTS': |
---|
684 | |
---|
685 | if kwargs['nclients'] == self.select_window: |
---|
686 | self.windowctl('+') |
---|
687 | else: |
---|
688 | self.windowctl('-') |
---|
689 | self.slowcheckctl('reset') |
---|
690 | # |
---|
691 | # if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION: |
---|
692 | # self.windowctl('+') |
---|
693 | # |
---|
694 | # if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM: |
---|
695 | # self.windowctl('-') |
---|
696 | # self.slowcheckctl('+') |
---|
697 | # else: |
---|
698 | # self.windowctl('+') |
---|
699 | # |
---|
700 | # elif kwargs['nclients'] < self.select_window/2: |
---|
701 | # self.windowctl('-') |
---|
702 | # |
---|
703 | # self.slowcheckctl('reset') |
---|
704 | |
---|
705 | if kwargs['event']=='CHECK_SANITY': |
---|
706 | # CPU SETTINGS |
---|
707 | if not self.terminate and self.load > LIMIT: |
---|
708 | self.paused = True |
---|
709 | else: |
---|
710 | self.paused = False |
---|
711 | |
---|
712 | # DB MEM SETTINGS |
---|
713 | if self.max_heap and self.temporary_tables_size['sum']: |
---|
714 | if self.temporary_tables_size['sum'] > self.max_heap * 0.4: |
---|
715 | self.windowctl('+') |
---|
716 | if self.paused: |
---|
717 | #printea('Hitting max temporary table size unpausing','critical') |
---|
718 | self.paused = False |
---|
719 | # SERVER MEM |
---|
720 | if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER: |
---|
721 | #printea('Hitting max memory from server collecting and reducing window','critical') |
---|
722 | self.windowctl('-') |
---|
723 | self.slowcheckctl('+') |
---|
724 | self.USE_MAX_QUEUES=True |
---|
725 | for x in THREADS: |
---|
726 | self.db[x].need_clean=True |
---|
727 | gc.collect() |
---|
728 | else: |
---|
729 | # self.USE_MAX_QUEUES=False |
---|
730 | for x in THREADS: |
---|
731 | self.db[x].need_clean=False |
---|
732 | |
---|
733 | if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM: |
---|
734 | self.windowctl('-') |
---|
735 | self.slowcheckctl('+') |
---|
736 | |
---|
737 | # QUEUES |
---|
738 | if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION: |
---|
739 | self.windowctl('+') |
---|
740 | |
---|
741 | |
---|
742 | def get_cpu_load(self): |
---|
743 | self.loadlist.append(psutil.cpu_percent()) |
---|
744 | self.loadlist=self.loadlist[1:] |
---|
745 | avg=0.0 |
---|
746 | for x in self.loadlist: |
---|
747 | avg+=x |
---|
748 | return round(avg/100.0,2) |
---|
749 | |
---|
750 | def put_config(self,key='',value=''): |
---|
751 | pass |
---|
752 | |
---|
753 | def term(self,*args,**kwargs): |
---|
754 | printea("Begin kill the program, wait please...",'info') |
---|
755 | self.terminate=True |
---|
756 | |
---|
757 | def prepare_threads(self): |
---|
758 | global DAEMON |
---|
759 | |
---|
760 | self.threads['load']=Process(target=self.get_load) |
---|
761 | self.threads['load'].daemon = DAEMON |
---|
762 | self.threads['load'].start() |
---|
763 | for x in THREADS: |
---|
764 | self.threads[x]=Process(target=self.db[x].worker) |
---|
765 | self.threads[x].daemon = DAEMON |
---|
766 | self.threads[x].start() |
---|
767 | |
---|
768 | def get_mem_usage(self): |
---|
769 | process = psutil.Process(os.getpid()) |
---|
770 | mem = process.memory_info()[0] / float(2 ** 20) |
---|
771 | return mem |
---|
772 | |
---|
773 | def get_server_free_mem(self): |
---|
774 | mem = psutil.virtual_memory() |
---|
775 | return mem.free / (2 ** 20) |
---|
776 | |
---|
777 | def print_stats(self): |
---|
778 | global CHECK_LOAD_TIME |
---|
779 | # if DEBUG: |
---|
780 | if True: |
---|
781 | out="Processed: " |
---|
782 | out2='' |
---|
783 | total=0 |
---|
784 | for x in THREADS: |
---|
785 | out2+='{}={} '.format(x,self.db[x].processed) |
---|
786 | self.cfg.store('processed '+x,self.db[x].processed) |
---|
787 | if THREADED: |
---|
788 | if x != 'main': |
---|
789 | total += self.db[x].processed |
---|
790 | else: |
---|
791 | total += self.db[x].processed |
---|
792 | out += "TOTAL={} {}".format(total,out2) |
---|
793 | self.cfg.store('processed total',total) |
---|
794 | proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME) |
---|
795 | self.procesed_per_sec.append(proc_per_sec) |
---|
796 | self.procesed_per_sec=self.procesed_per_sec[-10:] |
---|
797 | f=lambda x,y:x+y |
---|
798 | suma_proc=reduce(f,self.procesed_per_sec) |
---|
799 | self.cfg.store('processing at',int(suma_proc/10)) |
---|
800 | self.procesed=total |
---|
801 | total = 0 |
---|
802 | if THREADED: |
---|
803 | out+="Queues: " |
---|
804 | sizes=self.get_q_sizes() |
---|
805 | out2='' |
---|
806 | for x in sizes: |
---|
807 | self.cfg.store('queue '+x,sizes[x]) |
---|
808 | out2+='{}={} '.format(x,sizes[x]) |
---|
809 | total += sizes[x] |
---|
810 | self.cfg.store('queue totals',total) |
---|
811 | out+="TOTAL={} {}".format(total,out2) |
---|
812 | self.cfg.store('select window',self.select_window) |
---|
813 | self.cfg.store('mem used',self.MEM_USED) |
---|
814 | self.cfg.store('load',self.load) |
---|
815 | if (self.paused): |
---|
816 | self.cfg.store('paused',1) |
---|
817 | else: |
---|
818 | self.cfg.store('paused',0) |
---|
819 | self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients'])) |
---|
820 | self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages'])) |
---|
821 | self.cfg.store('db_clients_size',int(self.db_tables_size['clients'])) |
---|
822 | self.cfg.store('db_packages_size',int(self.db_tables_size['packages'])) |
---|
823 | if DEBUG: |
---|
824 | if THREADED: |
---|
825 | printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info') |
---|
826 | #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) |
---|
827 | else: |
---|
828 | printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info') |
---|
829 | #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) |
---|
830 | |
---|
831 | def get_q_sizes(self): |
---|
832 | sizes={} |
---|
833 | for x in THREADS: |
---|
834 | sizes[x]=self.db[x].q.qsize() |
---|
835 | return sizes |
---|
836 | |
---|
837 | def sum_q_sizes(self): |
---|
838 | sizes=self.get_q_sizes() |
---|
839 | f=lambda x,y: x+y |
---|
840 | return reduce(f,[sizes[x] for x in sizes]) |
---|
841 | |
---|
842 | def all_queues_empty(self): |
---|
843 | sizes=self.get_q_sizes() |
---|
844 | for x in sizes: |
---|
845 | if sizes[x] != 0: |
---|
846 | return False |
---|
847 | return True |
---|
848 | |
---|
849 | def get_load(self): |
---|
850 | # runs on separate thread |
---|
851 | db = DB(self,'load') |
---|
852 | db.init_db() |
---|
853 | self.cfg = Config(db) |
---|
854 | ctime=0 |
---|
855 | while not (self.terminate and self.finished): #and not self.paused |
---|
856 | self.cfg.store('keepalive',int(time.time())) |
---|
857 | time.sleep(1) |
---|
858 | self.load=self.get_cpu_load() |
---|
859 | ctime+=1 |
---|
860 | self.schedule(event='CHECK_SANITY') |
---|
861 | if ctime >CHECK_LOAD_TIME: |
---|
862 | self.cfg.write() |
---|
863 | ctime=0.0 |
---|
864 | |
---|
865 | self.MEM_USED=self.get_mem_usage() |
---|
866 | self.server_mem=self.get_server_free_mem() |
---|
867 | |
---|
868 | if self.server_variables and 'max_heap_table_size' in self.server_variables: |
---|
869 | self.max_heap=int(self.server_variables['max_heap_table_size'])/(2**20) |
---|
870 | |
---|
871 | self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size('tmp_clients') |
---|
872 | self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size('tmp_packages') |
---|
873 | self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size('Client_Versions') |
---|
874 | self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size('RecvPackages') |
---|
875 | self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages'] |
---|
876 | self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages'] |
---|
877 | self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages'] |
---|
878 | self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages'] |
---|
879 | qempty = self.all_queues_empty() |
---|
880 | if self.temporary_tables_rows['clients'] == 0 and self.temporary_tables_rows['packages'] == 0 and qempty: |
---|
881 | db.reset_autoinc() |
---|
882 | self.print_stats() |
---|
883 | #self.load=os.getloadavg()[0] |
---|
884 | if qempty: |
---|
885 | gc.collect() |
---|
886 | #end if |
---|
887 | #end while |
---|
888 | db.close_db() |
---|
889 | |
---|
890 | def start(self): |
---|
891 | self.prepare_threads() |
---|
892 | |
---|
893 | def end(self): |
---|
894 | for x in self.db: |
---|
895 | self.threads[x].join() |
---|
896 | self.db[x].close_db() |
---|
897 | self.finished = True |
---|
898 | self.print_stats() |
---|
899 | |
---|
900 | |
---|
901 | class Config(): |
---|
902 | def __init__(self,connection): |
---|
903 | self._db = connection |
---|
904 | self.read() |
---|
905 | |
---|
906 | def store(self,var,value): |
---|
907 | var=var.replace(' ','_') |
---|
908 | if isinstance(value,str): |
---|
909 | if value.isnumeric(): |
---|
910 | setattr(self,var,int(value)) |
---|
911 | else: |
---|
912 | setattr(self,var,str(value)) |
---|
913 | else: |
---|
914 | setattr(self,var,int(value)) |
---|
915 | |
---|
916 | def write(self): |
---|
917 | if self._db: |
---|
918 | values={} |
---|
919 | for x in self.get_internal_vars(): |
---|
920 | values.setdefault(str(x),str(getattr(self,x))) |
---|
921 | self._db.put_config(values) |
---|
922 | |
---|
923 | def read(self): |
---|
924 | if self._db: |
---|
925 | config=self._db.get_config() |
---|
926 | for key in config.keys(): |
---|
927 | if config[key].isnumeric(): |
---|
928 | setattr(self,key,int(config[key])) |
---|
929 | else: |
---|
930 | setattr(self,key,config[key]) |
---|
931 | else: |
---|
932 | print('No config yet') |
---|
933 | |
---|
934 | def get_internal_vars(self): |
---|
935 | return list(filter(lambda x : x[0] != '_',self.__dict__.keys())) |
---|
936 | |
---|
937 | def print(self): |
---|
938 | for v in self.get_internal_vars(): |
---|
939 | print('{} = {}'.format(v,getattr(self,v))) |
---|
940 | |
---|
941 | |
---|
942 | def main(*args,**kwargs): |
---|
943 | gc.enable() |
---|
944 | if DAEMON: |
---|
945 | fp = open('/var/run/analyticsd.pid','w'); |
---|
946 | fp.write(str(os.getpid())) |
---|
947 | fp.close() |
---|
948 | m = Monitor() |
---|
949 | m.start() |
---|
950 | printea("start done",'info') |
---|
951 | while not m.terminate: |
---|
952 | time.sleep(0.5) |
---|
953 | m.end() |
---|
954 | printea("Exitting...",'info') |
---|
955 | |
---|
956 | if __name__ == "__main__": |
---|
957 | exit = 0 |
---|
958 | keyword='analyticsd' |
---|
959 | interpreter='python3' |
---|
960 | for proc in psutil.process_iter(): |
---|
961 | a=False |
---|
962 | b=False |
---|
963 | for argument in proc.cmdline: |
---|
964 | #print('{} {} {}'.format(proc.cmdline,keyword,argument[-len(keyword):])) |
---|
965 | if interpreter in argument[-len(interpreter):]: |
---|
966 | a = True |
---|
967 | if keyword in argument[-len(keyword):]: |
---|
968 | b = True |
---|
969 | if a and b: |
---|
970 | exit = exit +1 |
---|
971 | if exit > 1: |
---|
972 | printea('Another daemon is running','error') |
---|
973 | sys.exit(1) |
---|
974 | |
---|
975 | lck = '/var/run/analyticsd' |
---|
976 | if DAEMON: |
---|
977 | if os.path.isfile(lck+'.lock'): |
---|
978 | printea('Lockfile {} detected, unable to start'.format(lck),'error') |
---|
979 | sys.exit(1) |
---|
980 | else: |
---|
981 | try: |
---|
982 | with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock(lck),files_preserve=[logger.handlers[0].socket.fileno()]): |
---|
983 | main() |
---|
984 | except Exception as e: |
---|
985 | printea(e) |
---|
986 | sys.exit(1) |
---|
987 | else: |
---|
988 | main() |
---|