source: taskScheduler/trunk/fuentes/python3-taskscheduler.install/usr/share/taskscheduler/taskscheduler.py @ 6078

Last change on this file since 6078 was 6078, checked in by Juanma, 2 years ago

Initial Release

File size: 7.1 KB
Line 
1#!/usr/bin/env python3
2###
3#
4###
5
6import os
7import json
8import sys
9try:
10        import xmlrpc.client as n4d
11except ImportError:
12        raise ImportError("xmlrpc not available. Disabling server queries")
13import ssl
14
15class TaskScheduler():
16        def __init__(self):
17                self.dbg=1
18                self.sw_n4d=1
19                if hasattr(sys,'last_value'):
20                #If there's any error at this point it only could be an ImportError caused by xmlrpc
21                        self.sw_n4d=0
22                else:
23                        self.n4dclient=self._n4d_connect()
24                self.tasks_dir="/etc/scheduler/tasks.d"
25                self.sched_tasks_dir=self.tasks_dir+"/scheduled"
26                self.local_tasks_dir=self.sched_tasks_dir+"/local"
27                self.cron_dir="/etc/cron.d"
28                self.task_prefix="local-" #If n4d it's available then prefix must be the one defined in n4d
29        #def __init__
30
31        def _debug(self,msg):
32                if (self.dbg):
33                        print("Scheduler Client: %s" % msg)
34        #def _debug
35
36        def get_available_tasks(self):
37                tasks=[]
38                wrkfiles=self._get_wrkfiles('available')
39                for wrkfile in wrkfiles:
40                        task=self._read_tasks_file(wrkfile)
41                        if task:
42                                tasks.append(task)
43                self._debug(str(tasks))
44                return tasks
45
46        def get_scheduled_tasks(self,taskFilter):
47                tasks=[]
48                self._debug("Connecting to N4d")
49                if taskFilter=='remote':
50                        if self.sw_n4d:
51                                tasks=self._get_remote_tasks()
52                else:
53                        tasks=self._get_local_tasks()
54                return tasks
55
56        def _get_remote_tasks(self):
57                tasks=[]
58                self._debug("Retrieving task list")
59                tasks=self.n4dclient.get_tasks("","ServerScheduler")
60                self._debug(str(tasks))
61                return tasks
62        #def _get_remote_tasks
63
64        def _get_local_tasks(self):
65                tasks=[]
66                wrkfiles=self._get_wrkfiles()
67                for wrkfile in wrkfiles:
68                        task=self._sanitize_fields(self._read_tasks_file(wrkfile))
69                        if task:
70                                tasks.append(task)
71                return tasks
72        #def _get_local_tasks
73
74        def _get_wrkfiles(self,task_type=None):
75                if task_type=='available':
76                        wrkdir=self.tasks_dir
77                else:
78                        wrkdir=self.local_tasks_dir
79                wrkfiles=[]
80                self._debug("Opening %s"%wrkdir)
81                if os.path.isdir(wrkdir):
82                        for f in os.listdir(wrkdir):
83                                wrkfiles.append(wrkdir+'/'+f)
84                return wrkfiles
85       
86        def _read_tasks_file(self,wrkfile):
87                self._debug("Opening %s" % wrkfile)
88                tasks=None
89                if os.path.isfile(wrkfile):
90                        try:
91                                tasks=json.loads(open(wrkfile).read())
92                        except Exception as e:
93                                print(e)
94                                self.errormsg=(("unable to open %s") % wrkfile)
95                                self.status=1
96                                self._debug(self.errormsg)
97                return(tasks)
98        #def _read_tasks_file
99
100        def _sanitize_fields(self,tasks):
101                for name,data in tasks.items():
102                        self._debug("Sanitize %s %s"%(name,data))
103                        for serial in data.keys():
104                                if tasks[name][serial]['dow']!='*':
105                                        tasks[name][serial]['dom']='*'
106                return tasks
107
108        def write_tasks(self,tasks,taskFilter):
109                if taskFilter=='remote':
110                        self._write_server_tasks(tasks)
111                else:
112                        self._write_local_tasks(tasks)
113        #def write_tasks
114
115        def _write_local_tasks(self,tasks):
116                self._debug("Writing local task info")
117                task_name=list(tasks.keys())[0]
118                task_serial=list(tasks[task_name].keys())[0]
119                sched_tasks={}
120                if not os.path.isdir(self.local_tasks_dir):
121                        os.makedirs(self.local_tasks_dir)
122
123                wrkfile=self.local_tasks_dir+'/'+task_name
124                wrkfile=wrkfile.replace(' ','_')
125                if os.path.isfile(wrkfile):
126                        sched_tasks=json.loads(open(wrkfile).read())
127                        serial=len(sched_tasks[task_name])
128                        if task_serial in sched_tasks[task_name].keys():
129                                self._debug("Modify item %s" % serial)
130                                sched_tasks[task_name][task_serial]=tasks[task_name][task_serial]
131                                #Modify
132                        else:
133                                #Add
134                                self._debug("Add item %s" % serial)
135                                serialized_data={}
136                                serialized_data[serial+1]=tasks[task_name][task_serial]
137                                self._debug("Data item %s" % serialized_data)
138                                self._debug("%s" % sched_tasks)
139                                sched_tasks[task_name].update(serialized_data)
140                                self._debug("%s" % sched_tasks)
141                else:
142                        self._debug("Add new item 1 to %s"%wrkfile)
143                        tasks[task_name]={"1":tasks[task_name]["0"]}
144                        sched_tasks=tasks.copy()
145                if sched_tasks:
146                        with open(wrkfile,'w') as json_data:
147                                json.dump(sched_tasks,json_data,indent=4)
148                else:
149                        if os.isfile(wrkfile):
150                                os.remove(wrkfile)
151                self._debug("%s updated" % task_name)
152                self._debug("%s" % sched_tasks)
153                self._send_tasks_to_crontab()
154
155        def _write_server_tasks(self,tasks):
156                self._debug("Sending task info to server")
157                tasks=self.n4dclient.write_tasks("","ServerScheduler",tasks)
158                return True
159
160        def remove_task(self,task_name,task_serial,task_cmd,filter_tasks):
161                if filter_tasks=='remote':
162                        self._remove_remote_task(task_name,task_serial,task_cmd)
163                else:
164                        self._remove_local_task(task_name,task_serial,task_cmd)
165
166        def _remove_local_task(self,task_name,task_serial,task_cmd):
167                self._debug("Removing task from system")
168                sw_del=False
169                wrkfile=self.local_tasks_dir+'/'+task_name
170                wrkfile=wrkfile.replace(' ','_')
171                task=self._read_tasks_file(wrkfile)
172                if task_name in task.keys():
173                        if task_serial in task[task_name].keys():
174                                del task[task_name][task_serial]
175                                sw_del=True
176
177                if sw_del:
178                        task=self._serialize_task(task)
179                        if task:
180                                with open(wrkfile,'w') as json_data:
181                                        json.dump(task,json_data,indent=4)
182                        else:
183                                if os.isfile(wrkfile):
184                                        os.remove(wrkfile)
185                        self._send_tasks_to_crontab()
186                return True
187
188        def _serialize_task(self,task):
189                serial_task={}
190                for name,task_data in task.items():
191                        print("PROCESSING %s"%name)
192                        cont=0
193                        serial_task[name]={}
194                        for serial,data in task_data.items():
195                                print("SERIAL: %s"%serial)
196                                serial_task[name].update({cont+1:data})
197                                cont+=1
198                return(serial_task)
199
200        def _remove_remote_task(self,task_name,task_serial,task_cmd):
201                self._debug("Removing task from server")
202                tasks=self.n4dclient.remove_task("","ServerScheduler",task_name,task_serial,task_cmd)
203#               self._debug("TASKS REMOVED")
204                print(tasks)
205        #def _remove_remote_task
206
207        def _send_tasks_to_crontab(self):
208                self._debug("Scheduling tasks")
209                #Get scheduled tasks
210                tasks=self._get_local_tasks()
211                #Create a dict with the task names
212                task_names={}
213                for task in tasks:
214                        for name in task.keys():
215                                self._debug("Scheduling %s"%name)
216                                self._debug("%s"%task)
217                                fname=name.replace(' ','_')
218                                task_names[fname]=task
219                                self._debug("%s"%task_names)
220                                self._write_crontab_for_task(task_names[fname])
221
222                for f in os.listdir(self.cron_dir):
223                        if f.startswith(self.task_prefix):
224                                fname=f.replace(self.task_prefix,'')
225                                if fname not in task_names.keys():
226                                        self._debug("Removing %s"%f)
227                                        self._debug("%s"%task_names)
228                                        #Task is not scheduled, delete it
229                                        os.remove(self.cron_dir+'/'+f)
230
231        #def _send_tasks_to_crontab
232
233        def _write_crontab_for_task(self,ftask):
234                task=list(ftask.keys())[0]
235                for task_name,task_data in ftask.items():
236                        fname=self.cron_dir+'/'+self.task_prefix+task_name.replace(' ','_')
237                        cron_array=[]
238                        self._debug("Sending %s" %task_name)
239                        self._debug("Data %s"%task_data)
240                        for task_serial,task_info in task_data.items():
241                                cron_task=("%s %s %s %s %s %s"%(task_info['m'],task_info['h'],task_info['dom'],task_info['mon'],\
242                                                        task_info['dow'],task_info['cmd']))
243
244                                cron_array.append(cron_task)
245                        with open(fname,'w') as data:
246                                for cron_line in cron_array:
247                                        data.write(cron_line+"\n")
248
249        def _n4d_connect(self):
250                #Setup SSL
251                context=ssl._create_unverified_context()
252                n4dclient = n4d.ServerProxy("https://server:9779",context=context,allow_none=True)
253                return(n4dclient)
254        #def _n4d_connect
Note: See TracBrowser for help on using the repository browser.