source: taskscheduler/trunk/fuentes/server-scheduler.install/usr/share/n4d/python-plugins/SchedulerServer.py

Last change on this file was 7607, checked in by Juanma, 16 months ago

disable debug

File size: 9.2 KB
Line 
1#!/usr/bin/env python
2######
3#Scheduler library for Lliurex
4#Add to N4D the scheduled tasks
5#This class reads the json file with the scheduled tasks and
6#distributes the info among the clients
7# -*- coding: utf-8 -*-
8import os
9import json
10from  datetime import date
11
12class SchedulerServer():
13        def __init__(self):
14                self.dbg=0
15                self.tasks_dir="/etc/scheduler/tasks.d"
16                self.schedTasksDir=self.tasks_dir+"/scheduled"
17                self.available_tasks_dir="/etc/scheduler/conf.d/tasks"
18                self.custom_tasks=self.available_tasks_dir+"/personal.json"
19                self.remote_tasks_dir=self.tasks_dir+"/remote"
20                self.local_tasks_dir=self.tasks_dir+"/local"
21                self.commands_file='/etc/scheduler/conf.d/commands/commands.json'
22        #def __init__
23
24        def _debug(self,msg):
25                if (self.dbg):
26                        print("Scheduler: %s" %msg)
27        #def _debug
28
29        def get_tasks(self,*args):
30                return(self._read_wrkfiles(self.tasks_dir))
31        #def get_tasks
32
33        def get_local_tasks(self,*args):
34                today=date.today()
35                local_tasks={}
36                tasks_data=self._read_wrkfiles(self.tasks_dir)['data'].copy()
37                status=False
38                for task_name,serial_data in tasks_data.items():
39                        sw_continue=False
40                        for serial,data in serial_data.items():
41                                print("DATA: %s"%data)
42                                sw_pass=False
43                                if 'autoremove' in data.keys():
44                                        if (data['mon'].isdigit()):
45                                                mon=int(data['mon'])
46                                                if mon<today.month:
47                                                        sw_pass=True
48                                        if sw_pass==False:
49                                                if (data['dom'].isdigit()):
50                                                        dom=int(data['dom'])
51                                                        if dom<today.day:
52                                                                sw_pass=True
53                                        if sw_pass:
54                                                task={}
55                                                self._debug("Autoremoving %s %s"%(task_name,serial))
56                                                task['name']=task_name
57                                                task['serial']=serial
58                                                self.remove_task(task)
59                                                continue
60                                if 'spread' in data.keys():
61                                        if data['spread']==False:
62                                                if task_name in local_tasks.keys():
63                                                        local_tasks[task_name][serial]=tasks_data[task_name][serial]
64                                                        status=True
65                                                else:
66                                                        local_tasks[task_name]={serial:tasks_data[task_name][serial]}
67                                                        status=True
68                                else:
69                                        if task_name in local_tasks.keys():
70                                                local_tasks[task_name][serial]=tasks_data[task_name][serial]
71                                                status=True
72                                        else:
73                                                local_tasks[task_name]={serial:tasks_data[task_name][serial]}
74                                                status=True
75                return ({'status':status,'data':local_tasks})
76
77        def get_remote_tasks(self,*args):
78                remote_tasks={}
79                tasks_data=self._read_wrkfiles(self.tasks_dir)['data'].copy()
80                status=False
81
82                for task_name,serial_data in tasks_data.items():
83                        sw_continue=False
84                        for serial,data in serial_data.items():
85                                if 'spread' in data.keys():
86                                        if data['spread']==True:
87                                                if task_name in remote_tasks.keys():
88                                                        remote_tasks[task_name]['r'+serial]=tasks_data[task_name][serial]
89                                                        status=True
90                                                else:
91                                                        remote_tasks[task_name]={'r'+serial:tasks_data[task_name][serial]}
92                                                        status=True
93                return ({'status':status,'data':remote_tasks})
94
95        def get_available_tasks(self):
96                return(self._read_wrkfiles(self.available_tasks_dir))
97
98        def _read_wrkfiles(self,folder):
99                tasks={}
100                wrkfiles=self._get_wrkfiles(folder)
101                self._debug(folder)
102                for wrkfile in wrkfiles:
103                        task=self._read_tasks_file(wrkfile)
104                        if task:
105                                tasks.update(task)
106                self._debug("Tasks loaded")
107                self._debug(str(tasks))
108                return({'status':True,'data':tasks})
109
110        def _get_wrkfiles(self,folder):
111                wrkfiles=[]
112                if not os.path.isdir(folder):
113                        os.makedirs(folder)
114                for f in os.listdir(folder):
115                        wrkfiles.append(folder+'/'+f)
116                return wrkfiles
117        #def _get_wrkfiles
118
119        def _read_tasks_file(self,wrkfile):
120                self._debug("Opening %s" % wrkfile)
121                tasks={}
122                if os.path.isfile(wrkfile):
123                        try:
124                                tasks=json.loads(open(wrkfile,"rb").read())
125                        except Exception as e:
126                                errormsg=(("unable to open %s") % wrkfile)
127                                errormsg=(("Reason: %s") %e)
128                                self._debug(errormsg)
129                return(tasks)
130        #def _read_tasks_file
131       
132        def remove_task(self,task,*args):
133                #Retrocompatibility
134                if type(task)==type(""):
135                        task_compat={}
136                        if task=='remote':
137                                task_compat['sw_remote']=True
138                        else:
139                                task_compat['sw_remote']=False
140                        if args[0]:
141                                task_compat['name']=args[0]
142                        if args[1]:
143                                task_compat['serial']=args[1]
144                        if args[2]:
145                                task_compat['cmd']=args[2]
146                        task=task_compat
147                wrk_dir=self.tasks_dir
148                self._debug("Removing task from system")
149                sw_del=False
150                msg=''
151                wrkfile=wrk_dir+'/'+task['name']
152                wrkfile=wrkfile.replace(' ','_')
153                tasks=self._read_tasks_file(wrkfile)
154                if task['name'] in tasks.keys():
155                        if task['serial'] in tasks[task['name']].keys():
156                                del tasks[task['name']][task['serial']]
157                                self._debug("Task deleted")
158                                sw_del=True
159                        elif task['serial'][0]=='r':
160                                serial=task['serial'].strip('r')
161                                if serial in tasks[task['name']].keys():
162                                        if tasks[task['name']][serial]['spread']:
163                                                del tasks[task['name']][serial]
164                                                self._debug("Task deleted")
165                                                sw_del=True
166
167
168                if sw_del:
169                        tasks=self._serialize_task(tasks)
170                        with open(wrkfile,'w') as json_data:
171                                json.dump(tasks,json_data,indent=4)
172                        self._register_cron_update()
173                return ({'status':sw_del,'data':msg})
174        #def remove_task
175
176        def _serialize_task(self,task):
177                serial_task={}
178                for name,task_data in task.items():
179                        cont=0
180                        serial_task[name]={}
181                        for serial,data in task_data.items():
182                                serial_task[name].update({cont+1:data})
183                                cont+=1
184                return(serial_task)
185        #def _serialize_task
186
187        def write_tasks(self,task_type,tasks):
188                wrk_dir=self.tasks_dir
189                self._debug("Writing task info")
190                msg=''
191                status=True
192                task_name=list(tasks.keys())[0]
193                task_serial=list(tasks[task_name].keys())[0]
194                task_data=tasks[task_name][task_serial]
195                del tasks[task_name][task_serial]
196                task_serial=task_serial.strip("r")
197                tasks[task_name]={task_serial:task_data}
198                self._debug(tasks)
199                serialized_task={}
200                sched_tasks={}
201                if not os.path.isdir(wrk_dir):
202                        os.makedirs(wrk_dir)
203
204                wrkfile=wrk_dir+'/'+task_name
205                wrkfile=wrkfile.replace(' ','_')
206                if os.path.isfile(wrkfile):
207                        sched_tasks=json.loads(open(wrkfile).read())
208                        serial=len(sched_tasks[task_name])
209                        data=self._fill_task_data(tasks[task_name][task_serial])
210                        tasks[task_name][task_serial]=data
211                        if task_type=='local':
212                                tasks[task_name][task_serial].update({'spread':False})
213                        elif task_type=="remote":
214                                tasks[task_name][task_serial].update({'spread':True})
215
216                        if task_serial in sched_tasks[task_name].keys():
217                                #Modify
218                                self._debug("Modify item %s" % serial)
219                                sched_tasks[task_name][task_serial]=tasks[task_name][task_serial]
220                        else:
221                                #Add
222                                self._debug("Add item %s" % serial)
223                                serialized_data={}
224                                data=self._fill_task_data(tasks[task_name][task_serial])
225                                tasks[task_name][task_serial]=data
226                                self._fill_task_data(tasks[task_name][task_serial])
227                                serialized_data[serial+1]=tasks[task_name][task_serial]
228                                sched_tasks[task_name].update(serialized_data)
229                else:
230                        self._debug("Add new item 1 to %s"%wrkfile)
231                        data=self._fill_task_data(tasks[task_name]["0"])
232                        tasks[task_name]["0"]=data
233                        if task_type=='local':
234                                tasks[task_name]["0"].update({'spread':False})
235                        elif task_type=="remote":
236                                tasks[task_name]["0"].update({'spread':True})
237                        tasks[task_name]={"1":tasks[task_name]["0"]}
238                        sched_tasks=tasks.copy()
239
240                try:
241                        with open(wrkfile,'w') as json_data:
242                                json.dump(sched_tasks,json_data,indent=4)
243                except Exception as e:
244                        msg=e
245                        status=False
246                self._register_cron_update()
247                self._debug("%s updated" % task_name)
248                return({'status':status,'data':msg})
249        #def write_tasks
250
251        def _fill_task_data(self,task):
252                task['kind']=[]
253                if ['spread'] not in task.keys():
254                        task['spread']=False
255                #set task kind
256                if task['dow']!='*':
257                        task['kind'].append('daily')
258                try:
259                        int(task['mon'])
260                        int(task['dom'])
261                        int(task['h'])
262                        int(task['m'])
263                        task['kind']=['fixed']
264                except:
265                        if '/' in (task['mon']+task['dom']+task['h']+task['m']):
266                                task['kind'].append('repeat')
267                return task
268
269        def write_tasks_old(self,tasks):
270                pass
271        #def write_tasks
272
273        def write_custom_task(self,cmd_name,cmd,parms):
274                status=True
275                msg=''
276                tasks={}
277                new_task={}
278                if os.path.isfile(self.custom_tasks):
279                        tasks=json.loads(open(self.custom_tasks).read())
280                        if not 'Personal' in tasks.keys():
281                                tasks['Personal']={}
282                else:
283                        tasks['Personal']={}
284                if '%s' in cmd:
285                        cmd=cmd.replace('%s','')
286                        new_task[cmd_name]=cmd+" '"+parms+"'"
287                else:
288                        new_task[cmd_name]=cmd+' '+parms
289                tasks['Personal'].update(new_task)
290                try:
291                        with open(self.custom_tasks,'w') as json_data:
292                                json.dump(tasks,json_data,indent=4)
293                except Exception as e:
294                        status=False
295                        msg=e
296                return({'status':status,'data':msg})
297        #def write_custom_task
298
299        def add_command(self,cmd_name,cmd):
300                self._debug("Adding command %s - %s"%(cmd_name,cmd))
301                commands={}
302                dict_cmd={cmd_name:cmd}
303                if os.path.isfile(self.commands_file):
304                        commands=json.loads(open(self.commands_file,"rb").read())
305                commands.update(dict_cmd)
306                with open(self.commands_file,'w') as json_data:
307                        json.dump(commands,json_data,indent=4)
308        #def add_command
309
310        def _register_cron_update(self):
311                self._debug("Registering trigger var")
312                val=0
313                if not objects["VariablesManager"].get_variable("SCHEDULED_TASKS"):
314                        self._debug("Initializing trigger var")
315                        objects["VariablesManager"].add_variable("SCHEDULED_TASKS",{},"","Scheduled tasks trigger","n4d-scheduler-server",False,False)
316                val=objects["VariablesManager"].get_variable("SCHEDULED_TASKS")
317                if not val:
318                        val=0
319                if val>=1000:
320                        val=0
321                val+=1
322                objects["VariablesManager"].set_variable("SCHEDULED_TASKS",val)
323                self._debug("New value is %s"%val)
324        #def _register_cron_update
Note: See TracBrowser for help on using the repository browser.