source: admin-center/trunk/fuentes/admin-center.install/usr/share/n4d/python-plugins/TaskMan.py @ 2002

Last change on this file since 2002 was 2002, checked in by joamuran, 4 years ago

relase 5.2

File size: 5.7 KB
Line 
1import time
2
3from actasks import Task
4from wsmanager import WSManager
5import websocket
6
7#import threading
8#import tempfile
9
10import os
11#import signal
12import shutil
13import ntpath
14
15
16
17class TaskMan():
18    '''
19    Server Task Manager
20   
21    Manages all tasks in server and its communication via web sockets
22   
23    '''
24    def __init__(self):
25        self.tasks={};                              # Task Dictionary
26        self.wsManager=WSManager();                 # Websocket manager (server)
27        self.ws=websocket.WebSocket();                  # websocket client (for write)
28        self.port=self.wsManager.getWs()["wsport"];     # Getting websocket port
29        self.ws.connect("ws://127.0.0.1:"+str(self.port));   #  "One websocket to dominate all" (Lord of the WebSockets)
30       
31   
32    def getWS(self):
33        try:
34            if(self.port):
35                return {'status':True, 'ws':'ws://127.0.0.1:'+str(self.port)};
36            else:
37                return {'status':False, 'msg':'self.port does not exists'}
38           
39        except Exception as e:
40                return {'status':False, 'msg':str(e)}
41           
42   
43
44    def newTask(self, command):
45        '''
46        creates a new task, identified by an id got from current time, and runs it
47        '''
48        try:
49            # Checking if there is any task running           
50            for task in self.tasks:
51                status=self.getTaskStatus(task)["taskStatus"];
52                if status=="RUNNING":
53                    return {"status": False, "msg":"SERVER_BUSY"}
54           
55            # If there is not any task running, let's continue
56           
57            id=int(round(time.time()*10))       # generating new id from sys time
58            newtask=Task(id, command)           # Create new task object (frow library actasks)           
59            self.tasks[str(id)]=newtask         # Adding to task dictionary           
60            newtask.runTask();                  # running Task
61
62            # Prepare thread (for multicasting, it will reads task log and redirects to websocket)
63            multicast_thread=threading.Thread(target=self.multicast, args=([newtask]))
64           
65            multicast_thread.daemon = True
66            multicast_thread.start()
67                       
68            return {"status": True, "msg":str(id)}
69           
70            pass       
71        except Exception as e:
72            return {"status": False, "msg":str(e)}
73   
74    def getTasks(self):
75        '''
76        Gets info for all tasks
77        '''
78        try:
79            return {"status":True, "msg":str(self.tasks)}
80        except Exception as e:
81            return {"status": False, "msg":str(e)}
82       
83   
84    def getTask(self, taskid):
85        '''
86        Gets info for current task
87        '''
88        try:
89            return {"status":True, "msg":self.tasks[taskid].get()}
90        except Exception as e:
91            return {"status": False, "msg":str(e)}
92       
93    def getTaskStatus(self, taskid):
94        '''
95        Gets status for current task
96        '''
97        try:
98            return {"status":True, "taskStatus":self.tasks[taskid].getStatus()}
99        except Exception as e:
100            return {"status": False, "msg":str(e)}
101       
102
103    def prepareLogForDownload(self, taskid):
104        '''
105        Copies task status from /run/taskmanager/pipe_XXX corresponding to taskid to admincenter/logs
106        '''
107        try:
108            pipe=self.tasks[taskid].getFilePipe();
109            if not os.path.exists("/tmp/taskslog"):
110                os.makedirs("/tmp/taskslog")
111            shutil.copy(pipe, "/tmp/taskslog/");
112           
113            return {"status":True, "file":ntpath.basename(pipe)}
114       
115        except Exception as e:
116            return {"status": False, "msg":str(e)}
117       
118
119    def cancelTask(self, taskid):
120        print "REMOVING ", str(taskid)
121        return self.tasks[taskid].stop();
122        pass
123       
124       
125    def listenTask(self, taskid):
126        '''
127        adds a new listener to tasks listener
128        '''
129        try:
130            print ("listening for task ", taskid)
131            print (self.tasks[taskid].get())
132            #return {"status": True, "msg":self.tasks[taskid].get()}
133           
134           
135            pass
136           
137        except Exception as e:
138            return {"status": False, "msg":str(e)}       
139
140   
141    def multicast(self, task):
142        try:
143            while True:
144                while (str(task.filepipe)==""): # Waitint to have task ready
145                    time.sleep(0.5)
146               
147                print str(task.filepipe)
148                if not os.path.exists(task.filepipe):
149                    time.sleep(1)
150                else:
151                    break;
152                    #return False
153            pipe = open(task.filepipe,'r')
154            pipe.seek(task.seek,0)
155            try:
156                line = pipe.readline()
157                if line:
158                    self.ws.send(str({"id": str(task.taskid), "line":line}));
159                while task.status=="RUNNING" or line:
160                    line = pipe.readline()
161                    while (line):
162                        self.ws.send(str({"id": str(task.taskid), "line":line}));
163                        line=pipe.readline()
164                        task.seek = pipe.tell()
165                   
166            except Exception as e:
167                print "[TASKMANAGER] Exception while reading pipe "+str(e)
168                pass
169                       
170                        ##if self.thread_jobs.has_key(job['job_id']):
171                        ##      self.thread_jobs.pop(job['job_id'])
172        except Exception as e:
173            print "[TASKMANAGER] Exception while reading pipe "+str(e)
174            pass
175           
176           
177           
178           
179           
180           
181       
Note: See TracBrowser for help on using the repository browser.