source: taskscheduler/trunk/fuentes/server-scheduler.install/usr/share/n4d/python-plugins/SchedulerServer.py @ 6811

Last change on this file since 6811 was 6811, checked in by Juanma, 19 months ago

WIP

File size: 6.8 KB
Line 
1#!/usr/bin/env python
2######
3#Scheduler library for Lliurex
4#Add to N4D the scheduled tasks
5#This class reads the json file with the scheduled tasks and
6#distributes the info among the clients
7# -*- coding: utf-8 -*-
8import os
9import json
10
11class SchedulerServer():
12        def __init__(self):
13                self.dbg=1
14                self.tasks_dir="/etc/scheduler/tasks.d"
15                self.schedTasksDir=self.tasks_dir+"/scheduled"
16                self.available_tasks_dir="/etc/scheduler/conf.d/tasks"
17                self.custom_tasks=self.available_tasks_dir+"/personal.json"
18                self.remote_tasks_dir=self.tasks_dir+"/remote"
19                self.local_tasks_dir=self.tasks_dir+"/local"
20        #def __init__
21
22        def _debug(self,msg):
23                if (self.dbg):
24                        print("Scheduler: %s" %msg)
25        #def _debug
26
27        def get_tasks(self):
28                return(self._read_wrkfiles(self.tasks_dir))
29        #def get_tasks
30
31        def get_local_tasks(self):
32                local_tasks={}
33                tasks_data=self._read_wrkfiles(self.tasks_dir)['data'].copy()
34                for task_serial in tasks_data.keys():
35                        sw_continue=False
36                        for task in task_serial.keys():
37                                if ['spread_task'] in task.keys():
38                                                if task['spread_task']==True:
39                                                        sw_continue=True
40                                                        break
41                        if sw_continue:
42                                continue
43                        local_tasks.update(task_serial)
44                return local_tasks
45
46        def get_remote_tasks(self):
47                remote_tasks={}
48                tasks_data=self._read_wrkfiles(self.tasks_dir)['data']
49                for task_serial in tasks_data.keys():
50                        sw_continue=False
51                        for task in task_serial.keys():
52                                if ['spread_task'] in task.keys():
53                                        if task['spread_task']==True:
54                                                remote_tasks.update(task_serial)
55                                                break
56                return remote_tasks
57
58        def get_available_tasks(self):
59                return(self._read_wrkfiles(self.available_tasks_dir))
60
61        def _read_wrkfiles(self,folder):
62                tasks={}
63                wrkfiles=self._get_wrkfiles(folder)
64                self._debug(folder)
65                for wrkfile in wrkfiles:
66                        task=self._read_tasks_file(wrkfile)
67                        if task:
68                                tasks.update(task)
69                self._debug("Tasks loaded")
70                self._debug(str(tasks))
71                return({'status':True,'data':tasks})
72
73        def _get_wrkfiles(self,folder):
74                wrkfiles=[]
75                if not os.path.isdir(folder):
76                        os.makedirs(folder)
77                for f in os.listdir(folder):
78                        wrkfiles.append(folder+'/'+f)
79                return wrkfiles
80        #def _get_wrkfiles
81
82        def _read_tasks_file(self,wrkfile):
83                self._debug("Opening %s" % wrkfile)
84                tasks={}
85                if os.path.isfile(wrkfile):
86                        try:
87                                tasks=json.loads(open(wrkfile).read())
88                        except :
89                                errormsg=(("unable to open %s") % wrkfile)
90                                self._debug(errormsg)
91                return(tasks)
92        #def _read_tasks_file
93       
94        def remove_task(self,task):
95                wrk_dir=self.tasks_dir
96                self._debug("Removing task from system")
97                sw_del=False
98                msg=''
99                wrkfile=wrk_dir+'/'+task['name']
100                wrkfile=wrkfile.replace(' ','_')
101                tasks=self._read_tasks_file(wrkfile)
102                if task['name'] in tasks.keys():
103                        if task['serial'] in tasks[task['name']].keys():
104                                del tasks[task['name']][task['serial']]
105                                self._debug("Task deleted")
106                                sw_del=True
107
108                if sw_del:
109                        tasks=self._serialize_task(tasks)
110                        with open(wrkfile,'w') as json_data:
111                                json.dump(tasks,json_data,indent=4)
112                        self._register_cron_update()
113                return ({'status':sw_del,'data':msg})
114        #def remove_task
115
116        def _serialize_task(self,task):
117                serial_task={}
118                for name,task_data in task.items():
119                        cont=0
120                        serial_task[name]={}
121                        for serial,data in task_data.items():
122                                serial_task[name].update({cont+1:data})
123                                cont+=1
124                return(serial_task)
125        #def _serialize_task
126
127        def write_tasks(self,task_type,tasks):
128                wrk_dir=self.tasks_dir
129                self._debug("Writing task info")
130                msg=''
131                status=True
132                task_name=list(tasks.keys())[0]
133                task_serial=list(tasks[task_name].keys())[0]
134                self._debug(tasks)
135                serialized_task={}
136                sched_tasks={}
137                if not os.path.isdir(wrk_dir):
138                        os.makedirs(wrk_dir)
139
140                wrkfile=wrk_dir+'/'+task_name
141                wrkfile=wrkfile.replace(' ','_')
142                if os.path.isfile(wrkfile):
143                        sched_tasks=json.loads(open(wrkfile).read())
144                        serial=len(sched_tasks[task_name])
145                        data=self._fill_task_data(tasks[task_name][task_serial])
146                        tasks[task_name][task_serial]=data
147                        if task_type=='local':
148                                tasks[task_name][task_serial].update({'spread':False})
149                        elif task_type=="remote":
150                                tasks[task_name][task_serial].update({'spread':True})
151
152                        if task_serial in sched_tasks[task_name].keys():
153                                #Modify
154                                self._debug("Modify item %s" % serial)
155                                sched_tasks[task_name][task_serial]=tasks[task_name][task_serial]
156                        else:
157                                #Add
158                                self._debug("Add item %s" % serial)
159                                serialized_data={}
160                                data=self._fill_task_data(tasks[task_name][task_serial])
161                                tasks[task_name][task_serial]=data
162                                self._fill_task_data(tasks[task_name][task_serial])
163                                serialized_data[serial+1]=tasks[task_name][task_serial]
164                                sched_tasks[task_name].update(serialized_data)
165                else:
166                        self._debug("Add new item 1 to %s"%wrkfile)
167                        data=self._fill_task_data(tasks[task_name]["0"])
168                        tasks[task_name]["0"]=data
169                        if task_type=='local':
170                                tasks[task_name]["0"].update({'spread':False})
171                        elif task_type=="remote":
172                                tasks[task_name]["0"].update({'spread':True})
173                        tasks[task_name]={"1":tasks[task_name]["0"]}
174                        sched_tasks=tasks.copy()
175
176                try:
177                        with open(wrkfile,'w') as json_data:
178                                json.dump(sched_tasks,json_data,indent=4)
179                except Exception as e:
180                        msg=e
181                        status=False
182                self._register_cron_update()
183                self._debug("%s updated" % task_name)
184                return({'status':status,'data':msg})
185        #def write_tasks
186
187        def _fill_task_data(self,task):
188                task['kind']=[]
189                if ['spread'] not in task.keys():
190                        task['spread']=False
191                #set task kind
192                if task['dow']!='*':
193                        task['kind'].append('daily')
194                try:
195                        int(task['mon'])
196                        int(task['dom'])
197                        int(task['h'])
198                        int(task['m'])
199                        task['kind']=['fixed']
200                except:
201                        if '/' in (task['mon']+task['dom']+task['h']+task['m']):
202                                task['kind'].append('repeat')
203                return task
204
205        def write_tasks_old(self,tasks):
206                pass
207        #def write_tasks
208
209        def write_custom_task(self,cmd_name,cmd,parms):
210                status=True
211                msg=''
212                tasks={}
213                new_task={}
214                if os.path.isfile(self.custom_tasks):
215                        tasks=json.loads(open(self.custom_tasks).read())
216                        if not 'Personal' in tasks.keys():
217                                tasks['Personal']={}
218                else:
219                        tasks['Personal']={}
220                if '%s' in cmd:
221                        cmd=cmd.replace('%s','')
222                        new_task[cmd_name]=cmd+" '"+parms+"'"
223                else:
224                        new_task[cmd_name]=cmd+' '+parms
225                tasks['Personal'].update(new_task)
226                try:
227                        with open(self.custom_tasks,'w') as json_data:
228                                json.dump(tasks,json_data,indent=4)
229                except Exception as e:
230                        status=False
231                        msg=e
232                return({'status':status,'data':msg})
233        #def write_custom_task
234
235        def _register_cron_update(self):
236                self._debug("Registering trigger var")
237                val=0
238                if not objects["VariablesManager"].get_variable("SCHEDULED_TASKS"):
239                        self._debug("Initializing trigger var")
240                        objects["VariablesManager"].add_variable("SCHEDULED_TASKS",{},"","Scheduled tasks trigger","n4d-scheduler-server",False,False)
241                val=objects["VariablesManager"].get_variable("SCHEDULED_TASKS")
242                if not val:
243                        val=0
244                if val>=1000:
245                        val=0
246                val+=1
247                objects["VariablesManager"].set_variable("SCHEDULED_TASKS",val)
248                self._debug("New value is %s"%val)
249        #def _register_cron_update
Note: See TracBrowser for help on using the repository browser.