source: taskscheduler/trunk/fuentes/server-scheduler.install/usr/share/n4d/python-plugins/SchedulerServer.py @ 6831

Last change on this file since 6831 was 6831, checked in by Juanma, 20 months ago

refactorized

File size: 8.1 KB
Line 
1#!/usr/bin/env python
2######
3#Scheduler library for Lliurex
4#Add to N4D the scheduled tasks
5#This class reads the json file with the scheduled tasks and
6#distributes the info among the clients
7# -*- coding: utf-8 -*-
8import os
9import json
10
11class SchedulerServer():
12        def __init__(self):
13                self.dbg=1
14                self.tasks_dir="/etc/scheduler/tasks.d"
15                self.schedTasksDir=self.tasks_dir+"/scheduled"
16                self.available_tasks_dir="/etc/scheduler/conf.d/tasks"
17                self.custom_tasks=self.available_tasks_dir+"/personal.json"
18                self.remote_tasks_dir=self.tasks_dir+"/remote"
19                self.local_tasks_dir=self.tasks_dir+"/local"
20                self.commands_file='/etc/scheduler/conf.d/commands/commands.json'
21        #def __init__
22
23        def _debug(self,msg):
24                if (self.dbg):
25                        print("Scheduler: %s" %msg)
26        #def _debug
27
28        def get_tasks(self):
29                return(self._read_wrkfiles(self.tasks_dir))
30        #def get_tasks
31
32        def get_local_tasks(self):
33                local_tasks={}
34                tasks_data=self._read_wrkfiles(self.tasks_dir)['data'].copy()
35
36                for task_name,serial_data in tasks_data.items():
37                        sw_continue=False
38                        for serial,data in serial_data.items():
39                                if 'spread' in data.keys():
40                                        if data['spread']==False:
41                                                if task_name in local_tasks.keys():
42#                                                       remote_tasks[task_name].update({'r'+serial:tasks_data[task_name][serial]})
43                                                        local_tasks[task_name]['r'+serial]=tasks_data[task_name][serial]
44                                                else:
45                                                        local_tasks[task_name]={'r'+serial:tasks_data[task_name][serial]}
46                                else:
47                                        if task_name in local_tasks.keys():
48                                                local_tasks[task_name]['r'+serial]=tasks_data[task_name][serial]
49                                        else:
50                                                local_tasks[task_name]={'r'+serial:tasks_data[task_name][serial]}
51                return local_tasks
52
53        def get_remote_tasks(self):
54                remote_tasks={}
55                tasks_data=self._read_wrkfiles(self.tasks_dir)['data'].copy()
56
57                for task_name,serial_data in tasks_data.items():
58                        sw_continue=False
59                        for serial,data in serial_data.items():
60                                if 'spread' in data.keys():
61                                        if data['spread']==True:
62                                                if task_name in remote_tasks.keys():
63#                                                       remote_tasks[task_name].update({'r'+serial:tasks_data[task_name][serial]})
64                                                        remote_tasks[task_name]['r'+serial]=tasks_data[task_name][serial]
65                                                else:
66                                                        remote_tasks[task_name]={'r'+serial:tasks_data[task_name][serial]}
67                return remote_tasks
68
69        def get_available_tasks(self):
70                return(self._read_wrkfiles(self.available_tasks_dir))
71
72        def _read_wrkfiles(self,folder):
73                tasks={}
74                wrkfiles=self._get_wrkfiles(folder)
75                self._debug(folder)
76                for wrkfile in wrkfiles:
77                        task=self._read_tasks_file(wrkfile)
78                        if task:
79                                tasks.update(task)
80                self._debug("Tasks loaded")
81                self._debug(str(tasks))
82                return({'status':True,'data':tasks})
83
84        def _get_wrkfiles(self,folder):
85                wrkfiles=[]
86                if not os.path.isdir(folder):
87                        os.makedirs(folder)
88                for f in os.listdir(folder):
89                        wrkfiles.append(folder+'/'+f)
90                return wrkfiles
91        #def _get_wrkfiles
92
93        def _read_tasks_file(self,wrkfile):
94                self._debug("Opening %s" % wrkfile)
95                tasks={}
96                if os.path.isfile(wrkfile):
97                        try:
98#                               with open(wrkfile,"rb") as fh:
99#                                       tasks=json.load(fh)
100                                tasks=json.loads(open(wrkfile,"rb").read())
101                        except Exception as e:
102                                errormsg=(("unable to open %s") % wrkfile)
103                                errormsg=(("Reason: %s") %e)
104                                self._debug(errormsg)
105                return(tasks)
106        #def _read_tasks_file
107       
108        def remove_task(self,task):
109                wrk_dir=self.tasks_dir
110                self._debug("Removing task from system")
111                sw_del=False
112                msg=''
113                wrkfile=wrk_dir+'/'+task['name']
114                wrkfile=wrkfile.replace(' ','_')
115                tasks=self._read_tasks_file(wrkfile)
116                if task['name'] in tasks.keys():
117                        if task['serial'] in tasks[task['name']].keys():
118                                del tasks[task['name']][task['serial']]
119                                self._debug("Task deleted")
120                                sw_del=True
121
122                if sw_del:
123                        tasks=self._serialize_task(tasks)
124                        with open(wrkfile,'w') as json_data:
125                                json.dump(tasks,json_data,indent=4)
126                        self._register_cron_update()
127                return ({'status':sw_del,'data':msg})
128        #def remove_task
129
130        def _serialize_task(self,task):
131                serial_task={}
132                for name,task_data in task.items():
133                        cont=0
134                        serial_task[name]={}
135                        for serial,data in task_data.items():
136                                serial_task[name].update({cont+1:data})
137                                cont+=1
138                return(serial_task)
139        #def _serialize_task
140
141        def write_tasks(self,task_type,tasks):
142                wrk_dir=self.tasks_dir
143                self._debug("Writing task info")
144                msg=''
145                status=True
146                task_name=list(tasks.keys())[0]
147                task_serial=list(tasks[task_name].keys())[0]
148                task_data=tasks[task_name][task_serial]
149                del tasks[task_name][task_serial]
150                task_serial=task_serial.strip("r")
151                tasks[task_name]={task_serial:task_data}
152                self._debug(tasks)
153                serialized_task={}
154                sched_tasks={}
155                if not os.path.isdir(wrk_dir):
156                        os.makedirs(wrk_dir)
157
158                wrkfile=wrk_dir+'/'+task_name
159                wrkfile=wrkfile.replace(' ','_')
160                if os.path.isfile(wrkfile):
161                        sched_tasks=json.loads(open(wrkfile).read())
162                        serial=len(sched_tasks[task_name])
163                        data=self._fill_task_data(tasks[task_name][task_serial])
164                        tasks[task_name][task_serial]=data
165                        if task_type=='local':
166                                tasks[task_name][task_serial].update({'spread':False})
167                        elif task_type=="remote":
168                                tasks[task_name][task_serial].update({'spread':True})
169
170                        if task_serial in sched_tasks[task_name].keys():
171                                #Modify
172                                self._debug("Modify item %s" % serial)
173                                sched_tasks[task_name][task_serial]=tasks[task_name][task_serial]
174                        else:
175                                #Add
176                                self._debug("Add item %s" % serial)
177                                serialized_data={}
178                                data=self._fill_task_data(tasks[task_name][task_serial])
179                                tasks[task_name][task_serial]=data
180                                self._fill_task_data(tasks[task_name][task_serial])
181                                serialized_data[serial+1]=tasks[task_name][task_serial]
182                                sched_tasks[task_name].update(serialized_data)
183                else:
184                        self._debug("Add new item 1 to %s"%wrkfile)
185                        data=self._fill_task_data(tasks[task_name]["0"])
186                        tasks[task_name]["0"]=data
187                        if task_type=='local':
188                                tasks[task_name]["0"].update({'spread':False})
189                        elif task_type=="remote":
190                                tasks[task_name]["0"].update({'spread':True})
191                        tasks[task_name]={"1":tasks[task_name]["0"]}
192                        sched_tasks=tasks.copy()
193
194                try:
195                        with open(wrkfile,'w') as json_data:
196                                json.dump(sched_tasks,json_data,indent=4)
197                except Exception as e:
198                        msg=e
199                        status=False
200                self._register_cron_update()
201                self._debug("%s updated" % task_name)
202                return({'status':status,'data':msg})
203        #def write_tasks
204
205        def _fill_task_data(self,task):
206                task['kind']=[]
207                if ['spread'] not in task.keys():
208                        task['spread']=False
209                #set task kind
210                if task['dow']!='*':
211                        task['kind'].append('daily')
212                try:
213                        int(task['mon'])
214                        int(task['dom'])
215                        int(task['h'])
216                        int(task['m'])
217                        task['kind']=['fixed']
218                except:
219                        if '/' in (task['mon']+task['dom']+task['h']+task['m']):
220                                task['kind'].append('repeat')
221                return task
222
223        def write_tasks_old(self,tasks):
224                pass
225        #def write_tasks
226
227        def write_custom_task(self,cmd_name,cmd,parms):
228                status=True
229                msg=''
230                tasks={}
231                new_task={}
232                if os.path.isfile(self.custom_tasks):
233                        tasks=json.loads(open(self.custom_tasks).read())
234                        if not 'Personal' in tasks.keys():
235                                tasks['Personal']={}
236                else:
237                        tasks['Personal']={}
238                if '%s' in cmd:
239                        cmd=cmd.replace('%s','')
240                        new_task[cmd_name]=cmd+" '"+parms+"'"
241                else:
242                        new_task[cmd_name]=cmd+' '+parms
243                tasks['Personal'].update(new_task)
244                try:
245                        with open(self.custom_tasks,'w') as json_data:
246                                json.dump(tasks,json_data,indent=4)
247                except Exception as e:
248                        status=False
249                        msg=e
250                return({'status':status,'data':msg})
251        #def write_custom_task
252
253        def add_command(self,cmd_name,cmd):
254                self._debug("Adding command %s - %s"%(cmd_name,cmd))
255                commands={}
256                dict_cmd={cmd_name:cmd}
257                if os.path.isfile(self.commands_file):
258                        commands=json.loads(open(self.commands_file,"rb").read())
259                commands.update(dict_cmd)
260                with open(self.commands_file,'w') as json_data:
261                        json.dump(commands,json_data,indent=4)
262        #def add_command
263
264        def _register_cron_update(self):
265                self._debug("Registering trigger var")
266                val=0
267                if not objects["VariablesManager"].get_variable("SCHEDULED_TASKS"):
268                        self._debug("Initializing trigger var")
269                        objects["VariablesManager"].add_variable("SCHEDULED_TASKS",{},"","Scheduled tasks trigger","n4d-scheduler-server",False,False)
270                val=objects["VariablesManager"].get_variable("SCHEDULED_TASKS")
271                if not val:
272                        val=0
273                if val>=1000:
274                        val=0
275                val+=1
276                objects["VariablesManager"].set_variable("SCHEDULED_TASKS",val)
277                self._debug("New value is %s"%val)
278        #def _register_cron_update
Note: See TracBrowser for help on using the repository browser.