source: taskscheduler/trunk/fuentes/server-scheduler.install/usr/share/n4d/python-plugins/SchedulerServer.py @ 6818

Last change on this file since 6818 was 6818, checked in by Juanma, 19 months ago

refactorized

File size: 7.3 KB
Line 
1#!/usr/bin/env python
2######
3#Scheduler library for Lliurex
4#Add to N4D the scheduled tasks
5#This class reads the json file with the scheduled tasks and
6#distributes the info among the clients
7# -*- coding: utf-8 -*-
8import os
9import json
10
11class SchedulerServer():
12        def __init__(self):
13                self.dbg=1
14                self.tasks_dir="/etc/scheduler/tasks.d"
15                self.schedTasksDir=self.tasks_dir+"/scheduled"
16                self.available_tasks_dir="/etc/scheduler/conf.d/tasks"
17                self.custom_tasks=self.available_tasks_dir+"/personal.json"
18                self.remote_tasks_dir=self.tasks_dir+"/remote"
19                self.local_tasks_dir=self.tasks_dir+"/local"
20        #def __init__
21
22        def _debug(self,msg):
23                if (self.dbg):
24                        print("Scheduler: %s" %msg)
25        #def _debug
26
27        def get_tasks(self):
28                return(self._read_wrkfiles(self.tasks_dir))
29        #def get_tasks
30
31        def get_local_tasks(self):
32                local_tasks={}
33                tasks_data=self._read_wrkfiles(self.tasks_dir)['data'].copy()
34                for task_serial in tasks_data.keys():
35                        sw_continue=False
36                        for task in task_serial.keys():
37                                if ['spread'] in task.keys():
38                                                if task['spread']==True:
39                                                        sw_continue=True
40                                                        break
41                        if sw_continue:
42                                continue
43                        local_tasks.update(task_serial)
44                return local_tasks
45
46        def get_remote_tasks(self):
47                remote_tasks={}
48                tasks_data=self._read_wrkfiles(self.tasks_dir)['data'].copy()
49
50                for task_name,serial_data in tasks_data.items():
51                        sw_continue=False
52                        for serial,data in serial_data.items():
53                                if 'spread' in data.keys():
54                                        if data['spread']==True:
55                                                if task_name in remote_tasks.keys():
56#                                                       remote_tasks[task_name].update({'r'+serial:tasks_data[task_name][serial]})
57                                                        remote_tasks[task_name]['r'+serial]=tasks_data[task_name][serial]
58                                                else:
59                                                        remote_tasks[task_name]={'r'+serial:tasks_data[task_name][serial]}
60                return remote_tasks
61
62        def get_available_tasks(self):
63                return(self._read_wrkfiles(self.available_tasks_dir))
64
65        def _read_wrkfiles(self,folder):
66                tasks={}
67                wrkfiles=self._get_wrkfiles(folder)
68                self._debug(folder)
69                for wrkfile in wrkfiles:
70                        task=self._read_tasks_file(wrkfile)
71                        if task:
72                                tasks.update(task)
73                self._debug("Tasks loaded")
74                self._debug(str(tasks))
75                return({'status':True,'data':tasks})
76
77        def _get_wrkfiles(self,folder):
78                wrkfiles=[]
79                if not os.path.isdir(folder):
80                        os.makedirs(folder)
81                for f in os.listdir(folder):
82                        wrkfiles.append(folder+'/'+f)
83                return wrkfiles
84        #def _get_wrkfiles
85
86        def _read_tasks_file(self,wrkfile):
87                self._debug("Opening %s" % wrkfile)
88                tasks={}
89                if os.path.isfile(wrkfile):
90                        try:
91#                               with open(wrkfile,"rb") as fh:
92#                                       tasks=json.load(fh)
93                                tasks=json.loads(open(wrkfile,"rb").read())
94                        except Exception as e:
95                                errormsg=(("unable to open %s") % wrkfile)
96                                errormsg=(("Reason: %s") %e)
97                                self._debug(errormsg)
98                return(tasks)
99        #def _read_tasks_file
100       
101        def remove_task(self,task):
102                wrk_dir=self.tasks_dir
103                self._debug("Removing task from system")
104                sw_del=False
105                msg=''
106                wrkfile=wrk_dir+'/'+task['name']
107                wrkfile=wrkfile.replace(' ','_')
108                tasks=self._read_tasks_file(wrkfile)
109                if task['name'] in tasks.keys():
110                        if task['serial'] in tasks[task['name']].keys():
111                                del tasks[task['name']][task['serial']]
112                                self._debug("Task deleted")
113                                sw_del=True
114
115                if sw_del:
116                        tasks=self._serialize_task(tasks)
117                        with open(wrkfile,'w') as json_data:
118                                json.dump(tasks,json_data,indent=4)
119                        self._register_cron_update()
120                return ({'status':sw_del,'data':msg})
121        #def remove_task
122
123        def _serialize_task(self,task):
124                serial_task={}
125                for name,task_data in task.items():
126                        cont=0
127                        serial_task[name]={}
128                        for serial,data in task_data.items():
129                                serial_task[name].update({cont+1:data})
130                                cont+=1
131                return(serial_task)
132        #def _serialize_task
133
134        def write_tasks(self,task_type,tasks):
135                wrk_dir=self.tasks_dir
136                self._debug("Writing task info")
137                msg=''
138                status=True
139                task_name=list(tasks.keys())[0]
140                task_serial=list(tasks[task_name].keys())[0]
141                task_data=tasks[task_name][task_serial]
142                del tasks[task_name][task_serial]
143                task_serial=task_serial.strip("r")
144                tasks[task_name]={task_serial:task_data}
145                self._debug(tasks)
146                serialized_task={}
147                sched_tasks={}
148                if not os.path.isdir(wrk_dir):
149                        os.makedirs(wrk_dir)
150
151                wrkfile=wrk_dir+'/'+task_name
152                wrkfile=wrkfile.replace(' ','_')
153                if os.path.isfile(wrkfile):
154                        sched_tasks=json.loads(open(wrkfile).read())
155                        serial=len(sched_tasks[task_name])
156                        data=self._fill_task_data(tasks[task_name][task_serial])
157                        tasks[task_name][task_serial]=data
158                        if task_type=='local':
159                                tasks[task_name][task_serial].update({'spread':False})
160                        elif task_type=="remote":
161                                tasks[task_name][task_serial].update({'spread':True})
162
163                        if task_serial in sched_tasks[task_name].keys():
164                                #Modify
165                                self._debug("Modify item %s" % serial)
166                                sched_tasks[task_name][task_serial]=tasks[task_name][task_serial]
167                        else:
168                                #Add
169                                self._debug("Add item %s" % serial)
170                                serialized_data={}
171                                data=self._fill_task_data(tasks[task_name][task_serial])
172                                tasks[task_name][task_serial]=data
173                                self._fill_task_data(tasks[task_name][task_serial])
174                                serialized_data[serial+1]=tasks[task_name][task_serial]
175                                sched_tasks[task_name].update(serialized_data)
176                else:
177                        self._debug("Add new item 1 to %s"%wrkfile)
178                        data=self._fill_task_data(tasks[task_name]["0"])
179                        tasks[task_name]["0"]=data
180                        if task_type=='local':
181                                tasks[task_name]["0"].update({'spread':False})
182                        elif task_type=="remote":
183                                tasks[task_name]["0"].update({'spread':True})
184                        tasks[task_name]={"1":tasks[task_name]["0"]}
185                        sched_tasks=tasks.copy()
186
187                try:
188                        with open(wrkfile,'w') as json_data:
189                                json.dump(sched_tasks,json_data,indent=4)
190                except Exception as e:
191                        msg=e
192                        status=False
193                self._register_cron_update()
194                self._debug("%s updated" % task_name)
195                return({'status':status,'data':msg})
196        #def write_tasks
197
198        def _fill_task_data(self,task):
199                task['kind']=[]
200                if ['spread'] not in task.keys():
201                        task['spread']=False
202                #set task kind
203                if task['dow']!='*':
204                        task['kind'].append('daily')
205                try:
206                        int(task['mon'])
207                        int(task['dom'])
208                        int(task['h'])
209                        int(task['m'])
210                        task['kind']=['fixed']
211                except:
212                        if '/' in (task['mon']+task['dom']+task['h']+task['m']):
213                                task['kind'].append('repeat')
214                return task
215
216        def write_tasks_old(self,tasks):
217                pass
218        #def write_tasks
219
220        def write_custom_task(self,cmd_name,cmd,parms):
221                status=True
222                msg=''
223                tasks={}
224                new_task={}
225                if os.path.isfile(self.custom_tasks):
226                        tasks=json.loads(open(self.custom_tasks).read())
227                        if not 'Personal' in tasks.keys():
228                                tasks['Personal']={}
229                else:
230                        tasks['Personal']={}
231                if '%s' in cmd:
232                        cmd=cmd.replace('%s','')
233                        new_task[cmd_name]=cmd+" '"+parms+"'"
234                else:
235                        new_task[cmd_name]=cmd+' '+parms
236                tasks['Personal'].update(new_task)
237                try:
238                        with open(self.custom_tasks,'w') as json_data:
239                                json.dump(tasks,json_data,indent=4)
240                except Exception as e:
241                        status=False
242                        msg=e
243                return({'status':status,'data':msg})
244        #def write_custom_task
245
246        def _register_cron_update(self):
247                self._debug("Registering trigger var")
248                val=0
249                if not objects["VariablesManager"].get_variable("SCHEDULED_TASKS"):
250                        self._debug("Initializing trigger var")
251                        objects["VariablesManager"].add_variable("SCHEDULED_TASKS",{},"","Scheduled tasks trigger","n4d-scheduler-server",False,False)
252                val=objects["VariablesManager"].get_variable("SCHEDULED_TASKS")
253                if not val:
254                        val=0
255                if val>=1000:
256                        val=0
257                val+=1
258                objects["VariablesManager"].set_variable("SCHEDULED_TASKS",val)
259                self._debug("New value is %s"%val)
260        #def _register_cron_update
Note: See TracBrowser for help on using the repository browser.