source: taskscheduler/trunk/fuentes/server-scheduler.install/usr/share/n4d/python-plugins/SchedulerServer.py @ 6866

Last change on this file since 6866 was 6866, checked in by Juanma, 19 months ago

refactorized

File size: 8.7 KB
Line 
1#!/usr/bin/env python
2######
3#Scheduler library for Lliurex
4#Add to N4D the scheduled tasks
5#This class reads the json file with the scheduled tasks and
6#distributes the info among the clients
7# -*- coding: utf-8 -*-
8import os
9import json
10
11class SchedulerServer():
12        def __init__(self):
13                self.dbg=0
14                self.tasks_dir="/etc/scheduler/tasks.d"
15                self.schedTasksDir=self.tasks_dir+"/scheduled"
16                self.available_tasks_dir="/etc/scheduler/conf.d/tasks"
17                self.custom_tasks=self.available_tasks_dir+"/personal.json"
18                self.remote_tasks_dir=self.tasks_dir+"/remote"
19                self.local_tasks_dir=self.tasks_dir+"/local"
20                self.commands_file='/etc/scheduler/conf.d/commands/commands.json'
21        #def __init__
22
23        def _debug(self,msg):
24                if (self.dbg):
25                        print("Scheduler: %s" %msg)
26        #def _debug
27
28        def get_tasks(self,*args):
29                return(self._read_wrkfiles(self.tasks_dir))
30        #def get_tasks
31
32        def get_local_tasks(self,*args):
33                local_tasks={}
34                tasks_data=self._read_wrkfiles(self.tasks_dir)['data'].copy()
35                status=False
36                for task_name,serial_data in tasks_data.items():
37                        sw_continue=False
38                        for serial,data in serial_data.items():
39                                if 'spread' in data.keys():
40                                        if data['spread']==False:
41                                                if task_name in local_tasks.keys():
42                                                        local_tasks[task_name][serial]=tasks_data[task_name][serial]
43                                                        status=True
44                                                else:
45                                                        local_tasks[task_name]={serial:tasks_data[task_name][serial]}
46                                                        status=True
47                                else:
48                                        if task_name in local_tasks.keys():
49                                                local_tasks[task_name][serial]=tasks_data[task_name][serial]
50                                                status=True
51                                        else:
52                                                local_tasks[task_name]={serial:tasks_data[task_name][serial]}
53                                                status=True
54                return ({'status':status,'data':local_tasks})
55
56        def get_remote_tasks(self,*args):
57                remote_tasks={}
58                tasks_data=self._read_wrkfiles(self.tasks_dir)['data'].copy()
59                status=False
60
61                for task_name,serial_data in tasks_data.items():
62                        sw_continue=False
63                        for serial,data in serial_data.items():
64                                if 'spread' in data.keys():
65                                        if data['spread']==True:
66                                                if task_name in remote_tasks.keys():
67                                                        remote_tasks[task_name]['r'+serial]=tasks_data[task_name][serial]
68                                                        status=True
69                                                else:
70                                                        remote_tasks[task_name]={'r'+serial:tasks_data[task_name][serial]}
71                                                        status=True
72                return ({'status':status,'data':remote_tasks})
73
74        def get_available_tasks(self):
75                return(self._read_wrkfiles(self.available_tasks_dir))
76
77        def _read_wrkfiles(self,folder):
78                tasks={}
79                wrkfiles=self._get_wrkfiles(folder)
80                self._debug(folder)
81                for wrkfile in wrkfiles:
82                        task=self._read_tasks_file(wrkfile)
83                        if task:
84                                tasks.update(task)
85                self._debug("Tasks loaded")
86                self._debug(str(tasks))
87                return({'status':True,'data':tasks})
88
89        def _get_wrkfiles(self,folder):
90                wrkfiles=[]
91                if not os.path.isdir(folder):
92                        os.makedirs(folder)
93                for f in os.listdir(folder):
94                        wrkfiles.append(folder+'/'+f)
95                return wrkfiles
96        #def _get_wrkfiles
97
98        def _read_tasks_file(self,wrkfile):
99                self._debug("Opening %s" % wrkfile)
100                tasks={}
101                if os.path.isfile(wrkfile):
102                        try:
103                                tasks=json.loads(open(wrkfile,"rb").read())
104                        except Exception as e:
105                                errormsg=(("unable to open %s") % wrkfile)
106                                errormsg=(("Reason: %s") %e)
107                                self._debug(errormsg)
108                return(tasks)
109        #def _read_tasks_file
110       
111        def remove_task(self,task,*args):
112                #Retrocompatibility
113                if type(task)==type(""):
114                        task_compat={}
115                        if task=='remote':
116                                task_compat['sw_remote']=True
117                        else:
118                                task_compat['sw_remote']=False
119                        if args[0]:
120                                task_compat['name']=args[0]
121                        if args[1]:
122                                task_compat['serial']=args[1]
123                        if args[2]:
124                                task_compat['cmd']=args[2]
125                        task=task_compat
126                wrk_dir=self.tasks_dir
127                self._debug("Removing task from system")
128                sw_del=False
129                msg=''
130                wrkfile=wrk_dir+'/'+task['name']
131                wrkfile=wrkfile.replace(' ','_')
132                tasks=self._read_tasks_file(wrkfile)
133                if task['name'] in tasks.keys():
134                        if task['serial'] in tasks[task['name']].keys():
135                                del tasks[task['name']][task['serial']]
136                                self._debug("Task deleted")
137                                sw_del=True
138                        elif task['serial'][0]=='r':
139                                serial=task['serial'].strip('r')
140                                if serial in tasks[task['name']].keys():
141                                        if tasks[task['name']][serial]['spread']:
142                                                del tasks[task['name']][serial]
143                                                self._debug("Task deleted")
144                                                sw_del=True
145
146
147                if sw_del:
148                        tasks=self._serialize_task(tasks)
149                        with open(wrkfile,'w') as json_data:
150                                json.dump(tasks,json_data,indent=4)
151                        self._register_cron_update()
152                return ({'status':sw_del,'data':msg})
153        #def remove_task
154
155        def _serialize_task(self,task):
156                serial_task={}
157                for name,task_data in task.items():
158                        cont=0
159                        serial_task[name]={}
160                        for serial,data in task_data.items():
161                                serial_task[name].update({cont+1:data})
162                                cont+=1
163                return(serial_task)
164        #def _serialize_task
165
166        def write_tasks(self,task_type,tasks):
167                wrk_dir=self.tasks_dir
168                self._debug("Writing task info")
169                msg=''
170                status=True
171                task_name=list(tasks.keys())[0]
172                task_serial=list(tasks[task_name].keys())[0]
173                task_data=tasks[task_name][task_serial]
174                del tasks[task_name][task_serial]
175                task_serial=task_serial.strip("r")
176                tasks[task_name]={task_serial:task_data}
177                self._debug(tasks)
178                serialized_task={}
179                sched_tasks={}
180                if not os.path.isdir(wrk_dir):
181                        os.makedirs(wrk_dir)
182
183                wrkfile=wrk_dir+'/'+task_name
184                wrkfile=wrkfile.replace(' ','_')
185                if os.path.isfile(wrkfile):
186                        sched_tasks=json.loads(open(wrkfile).read())
187                        serial=len(sched_tasks[task_name])
188                        data=self._fill_task_data(tasks[task_name][task_serial])
189                        tasks[task_name][task_serial]=data
190                        if task_type=='local':
191                                tasks[task_name][task_serial].update({'spread':False})
192                        elif task_type=="remote":
193                                tasks[task_name][task_serial].update({'spread':True})
194
195                        if task_serial in sched_tasks[task_name].keys():
196                                #Modify
197                                self._debug("Modify item %s" % serial)
198                                sched_tasks[task_name][task_serial]=tasks[task_name][task_serial]
199                        else:
200                                #Add
201                                self._debug("Add item %s" % serial)
202                                serialized_data={}
203                                data=self._fill_task_data(tasks[task_name][task_serial])
204                                tasks[task_name][task_serial]=data
205                                self._fill_task_data(tasks[task_name][task_serial])
206                                serialized_data[serial+1]=tasks[task_name][task_serial]
207                                sched_tasks[task_name].update(serialized_data)
208                else:
209                        self._debug("Add new item 1 to %s"%wrkfile)
210                        data=self._fill_task_data(tasks[task_name]["0"])
211                        tasks[task_name]["0"]=data
212                        if task_type=='local':
213                                tasks[task_name]["0"].update({'spread':False})
214                        elif task_type=="remote":
215                                tasks[task_name]["0"].update({'spread':True})
216                        tasks[task_name]={"1":tasks[task_name]["0"]}
217                        sched_tasks=tasks.copy()
218
219                try:
220                        with open(wrkfile,'w') as json_data:
221                                json.dump(sched_tasks,json_data,indent=4)
222                except Exception as e:
223                        msg=e
224                        status=False
225                self._register_cron_update()
226                self._debug("%s updated" % task_name)
227                return({'status':status,'data':msg})
228        #def write_tasks
229
230        def _fill_task_data(self,task):
231                task['kind']=[]
232                if ['spread'] not in task.keys():
233                        task['spread']=False
234                #set task kind
235                if task['dow']!='*':
236                        task['kind'].append('daily')
237                try:
238                        int(task['mon'])
239                        int(task['dom'])
240                        int(task['h'])
241                        int(task['m'])
242                        task['kind']=['fixed']
243                except:
244                        if '/' in (task['mon']+task['dom']+task['h']+task['m']):
245                                task['kind'].append('repeat')
246                return task
247
248        def write_tasks_old(self,tasks):
249                pass
250        #def write_tasks
251
252        def write_custom_task(self,cmd_name,cmd,parms):
253                status=True
254                msg=''
255                tasks={}
256                new_task={}
257                if os.path.isfile(self.custom_tasks):
258                        tasks=json.loads(open(self.custom_tasks).read())
259                        if not 'Personal' in tasks.keys():
260                                tasks['Personal']={}
261                else:
262                        tasks['Personal']={}
263                if '%s' in cmd:
264                        cmd=cmd.replace('%s','')
265                        new_task[cmd_name]=cmd+" '"+parms+"'"
266                else:
267                        new_task[cmd_name]=cmd+' '+parms
268                tasks['Personal'].update(new_task)
269                try:
270                        with open(self.custom_tasks,'w') as json_data:
271                                json.dump(tasks,json_data,indent=4)
272                except Exception as e:
273                        status=False
274                        msg=e
275                return({'status':status,'data':msg})
276        #def write_custom_task
277
278        def add_command(self,cmd_name,cmd):
279                self._debug("Adding command %s - %s"%(cmd_name,cmd))
280                commands={}
281                dict_cmd={cmd_name:cmd}
282                if os.path.isfile(self.commands_file):
283                        commands=json.loads(open(self.commands_file,"rb").read())
284                commands.update(dict_cmd)
285                with open(self.commands_file,'w') as json_data:
286                        json.dump(commands,json_data,indent=4)
287        #def add_command
288
289        def _register_cron_update(self):
290                self._debug("Registering trigger var")
291                val=0
292                if not objects["VariablesManager"].get_variable("SCHEDULED_TASKS"):
293                        self._debug("Initializing trigger var")
294                        objects["VariablesManager"].add_variable("SCHEDULED_TASKS",{},"","Scheduled tasks trigger","n4d-scheduler-server",False,False)
295                val=objects["VariablesManager"].get_variable("SCHEDULED_TASKS")
296                if not val:
297                        val=0
298                if val>=1000:
299                        val=0
300                val+=1
301                objects["VariablesManager"].set_variable("SCHEDULED_TASKS",val)
302                self._debug("New value is %s"%val)
303        #def _register_cron_update
Note: See TracBrowser for help on using the repository browser.