1 | #!/usr/bin/env python |
---|
2 | ###### |
---|
3 | #Scheduler library for Lliurex |
---|
4 | #Add to N4D the scheduled tasks |
---|
5 | #This class reads the json file with the scheduled tasks and |
---|
6 | #distributes the info among the clients |
---|
7 | # -*- coding: utf-8 -*- |
---|
8 | import os |
---|
9 | import json |
---|
10 | from datetime import date |
---|
11 | |
---|
12 | class SchedulerServer(): |
---|
13 | def __init__(self): |
---|
14 | self.dbg=0 |
---|
15 | self.tasks_dir="/etc/scheduler/tasks.d" |
---|
16 | self.schedTasksDir=self.tasks_dir+"/scheduled" |
---|
17 | self.available_tasks_dir="/etc/scheduler/conf.d/tasks" |
---|
18 | self.custom_tasks=self.available_tasks_dir+"/personal.json" |
---|
19 | self.remote_tasks_dir=self.tasks_dir+"/remote" |
---|
20 | self.local_tasks_dir=self.tasks_dir+"/local" |
---|
21 | self.commands_file='/etc/scheduler/conf.d/commands/commands.json' |
---|
22 | #def __init__ |
---|
23 | |
---|
24 | def _debug(self,msg): |
---|
25 | if (self.dbg): |
---|
26 | print("Scheduler: %s" %msg) |
---|
27 | #def _debug |
---|
28 | |
---|
29 | def get_tasks(self,*args): |
---|
30 | return(self._read_wrkfiles(self.tasks_dir)) |
---|
31 | #def get_tasks |
---|
32 | |
---|
33 | def get_local_tasks(self,*args): |
---|
34 | today=date.today() |
---|
35 | local_tasks={} |
---|
36 | tasks_data=self._read_wrkfiles(self.tasks_dir)['data'].copy() |
---|
37 | status=False |
---|
38 | for task_name,serial_data in tasks_data.items(): |
---|
39 | sw_continue=False |
---|
40 | for serial,data in serial_data.items(): |
---|
41 | print("DATA: %s"%data) |
---|
42 | sw_pass=False |
---|
43 | if 'autoremove' in data.keys(): |
---|
44 | if (data['mon'].isdigit()): |
---|
45 | mon=int(data['mon']) |
---|
46 | if mon<today.month: |
---|
47 | sw_pass=True |
---|
48 | if sw_pass==False: |
---|
49 | if (data['dom'].isdigit()): |
---|
50 | dom=int(data['dom']) |
---|
51 | if dom<today.day: |
---|
52 | sw_pass=True |
---|
53 | if sw_pass: |
---|
54 | task={} |
---|
55 | self._debug("Autoremoving %s %s"%(task_name,serial)) |
---|
56 | task['name']=task_name |
---|
57 | task['serial']=serial |
---|
58 | self.remove_task(task) |
---|
59 | continue |
---|
60 | if 'spread' in data.keys(): |
---|
61 | if data['spread']==False: |
---|
62 | if task_name in local_tasks.keys(): |
---|
63 | local_tasks[task_name][serial]=tasks_data[task_name][serial] |
---|
64 | status=True |
---|
65 | else: |
---|
66 | local_tasks[task_name]={serial:tasks_data[task_name][serial]} |
---|
67 | status=True |
---|
68 | else: |
---|
69 | if task_name in local_tasks.keys(): |
---|
70 | local_tasks[task_name][serial]=tasks_data[task_name][serial] |
---|
71 | status=True |
---|
72 | else: |
---|
73 | local_tasks[task_name]={serial:tasks_data[task_name][serial]} |
---|
74 | status=True |
---|
75 | return ({'status':status,'data':local_tasks}) |
---|
76 | |
---|
77 | def get_remote_tasks(self,*args): |
---|
78 | remote_tasks={} |
---|
79 | tasks_data=self._read_wrkfiles(self.tasks_dir)['data'].copy() |
---|
80 | status=False |
---|
81 | |
---|
82 | for task_name,serial_data in tasks_data.items(): |
---|
83 | sw_continue=False |
---|
84 | for serial,data in serial_data.items(): |
---|
85 | if 'spread' in data.keys(): |
---|
86 | if data['spread']==True: |
---|
87 | if task_name in remote_tasks.keys(): |
---|
88 | remote_tasks[task_name]['r'+serial]=tasks_data[task_name][serial] |
---|
89 | status=True |
---|
90 | else: |
---|
91 | remote_tasks[task_name]={'r'+serial:tasks_data[task_name][serial]} |
---|
92 | status=True |
---|
93 | return ({'status':status,'data':remote_tasks}) |
---|
94 | |
---|
95 | def get_available_tasks(self): |
---|
96 | return(self._read_wrkfiles(self.available_tasks_dir)) |
---|
97 | |
---|
98 | def _read_wrkfiles(self,folder): |
---|
99 | tasks={} |
---|
100 | wrkfiles=self._get_wrkfiles(folder) |
---|
101 | self._debug(folder) |
---|
102 | for wrkfile in wrkfiles: |
---|
103 | task=self._read_tasks_file(wrkfile) |
---|
104 | if task: |
---|
105 | tasks.update(task) |
---|
106 | self._debug("Tasks loaded") |
---|
107 | self._debug(str(tasks)) |
---|
108 | return({'status':True,'data':tasks}) |
---|
109 | |
---|
110 | def _get_wrkfiles(self,folder): |
---|
111 | wrkfiles=[] |
---|
112 | if not os.path.isdir(folder): |
---|
113 | os.makedirs(folder) |
---|
114 | for f in os.listdir(folder): |
---|
115 | wrkfiles.append(folder+'/'+f) |
---|
116 | return wrkfiles |
---|
117 | #def _get_wrkfiles |
---|
118 | |
---|
119 | def _read_tasks_file(self,wrkfile): |
---|
120 | self._debug("Opening %s" % wrkfile) |
---|
121 | tasks={} |
---|
122 | if os.path.isfile(wrkfile): |
---|
123 | try: |
---|
124 | tasks=json.loads(open(wrkfile,"rb").read()) |
---|
125 | except Exception as e: |
---|
126 | errormsg=(("unable to open %s") % wrkfile) |
---|
127 | errormsg=(("Reason: %s") %e) |
---|
128 | self._debug(errormsg) |
---|
129 | return(tasks) |
---|
130 | #def _read_tasks_file |
---|
131 | |
---|
132 | def remove_task(self,task,*args): |
---|
133 | #Retrocompatibility |
---|
134 | if type(task)==type(""): |
---|
135 | task_compat={} |
---|
136 | if task=='remote': |
---|
137 | task_compat['sw_remote']=True |
---|
138 | else: |
---|
139 | task_compat['sw_remote']=False |
---|
140 | if args[0]: |
---|
141 | task_compat['name']=args[0] |
---|
142 | if args[1]: |
---|
143 | task_compat['serial']=args[1] |
---|
144 | if args[2]: |
---|
145 | task_compat['cmd']=args[2] |
---|
146 | task=task_compat |
---|
147 | wrk_dir=self.tasks_dir |
---|
148 | self._debug("Removing task from system") |
---|
149 | sw_del=False |
---|
150 | msg='' |
---|
151 | wrkfile=wrk_dir+'/'+task['name'] |
---|
152 | wrkfile=wrkfile.replace(' ','_') |
---|
153 | tasks=self._read_tasks_file(wrkfile) |
---|
154 | if task['name'] in tasks.keys(): |
---|
155 | if task['serial'] in tasks[task['name']].keys(): |
---|
156 | del tasks[task['name']][task['serial']] |
---|
157 | self._debug("Task deleted") |
---|
158 | sw_del=True |
---|
159 | elif task['serial'][0]=='r': |
---|
160 | serial=task['serial'].strip('r') |
---|
161 | if serial in tasks[task['name']].keys(): |
---|
162 | if tasks[task['name']][serial]['spread']: |
---|
163 | del tasks[task['name']][serial] |
---|
164 | self._debug("Task deleted") |
---|
165 | sw_del=True |
---|
166 | |
---|
167 | |
---|
168 | if sw_del: |
---|
169 | tasks=self._serialize_task(tasks) |
---|
170 | with open(wrkfile,'w') as json_data: |
---|
171 | json.dump(tasks,json_data,indent=4) |
---|
172 | self._register_cron_update() |
---|
173 | return ({'status':sw_del,'data':msg}) |
---|
174 | #def remove_task |
---|
175 | |
---|
176 | def _serialize_task(self,task): |
---|
177 | serial_task={} |
---|
178 | for name,task_data in task.items(): |
---|
179 | cont=0 |
---|
180 | serial_task[name]={} |
---|
181 | for serial,data in task_data.items(): |
---|
182 | serial_task[name].update({cont+1:data}) |
---|
183 | cont+=1 |
---|
184 | return(serial_task) |
---|
185 | #def _serialize_task |
---|
186 | |
---|
187 | def write_tasks(self,task_type,tasks): |
---|
188 | wrk_dir=self.tasks_dir |
---|
189 | self._debug("Writing task info") |
---|
190 | msg='' |
---|
191 | status=True |
---|
192 | task_name=list(tasks.keys())[0] |
---|
193 | task_serial=list(tasks[task_name].keys())[0] |
---|
194 | task_data=tasks[task_name][task_serial] |
---|
195 | del tasks[task_name][task_serial] |
---|
196 | task_serial=task_serial.strip("r") |
---|
197 | tasks[task_name]={task_serial:task_data} |
---|
198 | self._debug(tasks) |
---|
199 | serialized_task={} |
---|
200 | sched_tasks={} |
---|
201 | if not os.path.isdir(wrk_dir): |
---|
202 | os.makedirs(wrk_dir) |
---|
203 | |
---|
204 | wrkfile=wrk_dir+'/'+task_name |
---|
205 | wrkfile=wrkfile.replace(' ','_') |
---|
206 | if os.path.isfile(wrkfile): |
---|
207 | sched_tasks=json.loads(open(wrkfile).read()) |
---|
208 | serial=len(sched_tasks[task_name]) |
---|
209 | data=self._fill_task_data(tasks[task_name][task_serial]) |
---|
210 | tasks[task_name][task_serial]=data |
---|
211 | if task_type=='local': |
---|
212 | tasks[task_name][task_serial].update({'spread':False}) |
---|
213 | elif task_type=="remote": |
---|
214 | tasks[task_name][task_serial].update({'spread':True}) |
---|
215 | |
---|
216 | if task_serial in sched_tasks[task_name].keys(): |
---|
217 | #Modify |
---|
218 | self._debug("Modify item %s" % serial) |
---|
219 | sched_tasks[task_name][task_serial]=tasks[task_name][task_serial] |
---|
220 | else: |
---|
221 | #Add |
---|
222 | self._debug("Add item %s" % serial) |
---|
223 | serialized_data={} |
---|
224 | data=self._fill_task_data(tasks[task_name][task_serial]) |
---|
225 | tasks[task_name][task_serial]=data |
---|
226 | self._fill_task_data(tasks[task_name][task_serial]) |
---|
227 | serialized_data[serial+1]=tasks[task_name][task_serial] |
---|
228 | sched_tasks[task_name].update(serialized_data) |
---|
229 | else: |
---|
230 | self._debug("Add new item 1 to %s"%wrkfile) |
---|
231 | data=self._fill_task_data(tasks[task_name]["0"]) |
---|
232 | tasks[task_name]["0"]=data |
---|
233 | if task_type=='local': |
---|
234 | tasks[task_name]["0"].update({'spread':False}) |
---|
235 | elif task_type=="remote": |
---|
236 | tasks[task_name]["0"].update({'spread':True}) |
---|
237 | tasks[task_name]={"1":tasks[task_name]["0"]} |
---|
238 | sched_tasks=tasks.copy() |
---|
239 | |
---|
240 | try: |
---|
241 | with open(wrkfile,'w') as json_data: |
---|
242 | json.dump(sched_tasks,json_data,indent=4) |
---|
243 | except Exception as e: |
---|
244 | msg=e |
---|
245 | status=False |
---|
246 | self._register_cron_update() |
---|
247 | self._debug("%s updated" % task_name) |
---|
248 | return({'status':status,'data':msg}) |
---|
249 | #def write_tasks |
---|
250 | |
---|
251 | def _fill_task_data(self,task): |
---|
252 | task['kind']=[] |
---|
253 | if ['spread'] not in task.keys(): |
---|
254 | task['spread']=False |
---|
255 | #set task kind |
---|
256 | if task['dow']!='*': |
---|
257 | task['kind'].append('daily') |
---|
258 | try: |
---|
259 | int(task['mon']) |
---|
260 | int(task['dom']) |
---|
261 | int(task['h']) |
---|
262 | int(task['m']) |
---|
263 | task['kind']=['fixed'] |
---|
264 | except: |
---|
265 | if '/' in (task['mon']+task['dom']+task['h']+task['m']): |
---|
266 | task['kind'].append('repeat') |
---|
267 | return task |
---|
268 | |
---|
269 | def write_tasks_old(self,tasks): |
---|
270 | pass |
---|
271 | #def write_tasks |
---|
272 | |
---|
273 | def write_custom_task(self,cmd_name,cmd,parms): |
---|
274 | status=True |
---|
275 | msg='' |
---|
276 | tasks={} |
---|
277 | new_task={} |
---|
278 | if os.path.isfile(self.custom_tasks): |
---|
279 | tasks=json.loads(open(self.custom_tasks).read()) |
---|
280 | if not 'Personal' in tasks.keys(): |
---|
281 | tasks['Personal']={} |
---|
282 | else: |
---|
283 | tasks['Personal']={} |
---|
284 | if '%s' in cmd: |
---|
285 | cmd=cmd.replace('%s','') |
---|
286 | new_task[cmd_name]=cmd+" '"+parms+"'" |
---|
287 | else: |
---|
288 | new_task[cmd_name]=cmd+' '+parms |
---|
289 | tasks['Personal'].update(new_task) |
---|
290 | try: |
---|
291 | with open(self.custom_tasks,'w') as json_data: |
---|
292 | json.dump(tasks,json_data,indent=4) |
---|
293 | except Exception as e: |
---|
294 | status=False |
---|
295 | msg=e |
---|
296 | return({'status':status,'data':msg}) |
---|
297 | #def write_custom_task |
---|
298 | |
---|
299 | def add_command(self,cmd_name,cmd): |
---|
300 | self._debug("Adding command %s - %s"%(cmd_name,cmd)) |
---|
301 | commands={} |
---|
302 | dict_cmd={cmd_name:cmd} |
---|
303 | if os.path.isfile(self.commands_file): |
---|
304 | commands=json.loads(open(self.commands_file,"rb").read()) |
---|
305 | commands.update(dict_cmd) |
---|
306 | with open(self.commands_file,'w') as json_data: |
---|
307 | json.dump(commands,json_data,indent=4) |
---|
308 | #def add_command |
---|
309 | |
---|
310 | def _register_cron_update(self): |
---|
311 | self._debug("Registering trigger var") |
---|
312 | val=0 |
---|
313 | if not objects["VariablesManager"].get_variable("SCHEDULED_TASKS"): |
---|
314 | self._debug("Initializing trigger var") |
---|
315 | objects["VariablesManager"].add_variable("SCHEDULED_TASKS",{},"","Scheduled tasks trigger","n4d-scheduler-server",False,False) |
---|
316 | val=objects["VariablesManager"].get_variable("SCHEDULED_TASKS") |
---|
317 | if not val: |
---|
318 | val=0 |
---|
319 | if val>=1000: |
---|
320 | val=0 |
---|
321 | val+=1 |
---|
322 | objects["VariablesManager"].set_variable("SCHEDULED_TASKS",val) |
---|
323 | self._debug("New value is %s"%val) |
---|
324 | #def _register_cron_update |
---|