source: admin-center/trunk/fuentes/admin-center.install/usr/share/n4d/python-plugins/TaskMan.py @ 6622

Last change on this file since 6622 was 6622, checked in by kbut, 3 years ago

Fix timeout. Clean code

File size: 5.3 KB
Line 
1import time
2
3from actasks import Task
4from wsmanager import WSManager
5import websocket
6
7import threading
8import tempfile
9
10import os
11#import signal
12import shutil
13import ntpath
14
15class TaskMan():
16    '''
17    Server Task Manager
18   
19    Manages all tasks in server and its communication via web sockets
20   
21    '''
22    def __init__(self):
23        self.tasks={}                              # Task Dictionary
24        self.wsManager=WSManager()                 # Websocket manager (server)
25        self.ws=websocket.WebSocket()                  # websocket client (for write)
26        self.port=self.wsManager.getWs()["wsport"]     # Getting websocket port
27        self.ws.connect("ws://127.0.0.1:"+str(self.port))   #  "One websocket to dominate all" (Lord of the WebSockets)
28
29    def getWS(self):
30        try:
31            if(self.port):
32                return {'status':True, 'ws':'ws://127.0.0.1:'+str(self.port)}
33            else:
34                return {'status':False, 'msg':'self.port does not exists'}
35           
36        except Exception as e:
37                return {'status':False, 'msg':str(e)}
38           
39   
40
41    def newTask(self, command, cancelCommand=''):
42        '''
43        creates a new task, identified by an id got from current time, and runs it
44        '''
45        try:
46            # Checking if there is any task running           
47            for task in self.tasks:
48                status=self.getTaskStatus(task)["taskStatus"]
49                if status=="RUNNING":
50                    return {"status": False, "msg":"SERVER_BUSY"}
51           
52            # If there is not any task running, let's continue
53           
54            id = int(round(time.time()*10))               # generating new id from sys time
55            newtask = Task(id, command, cancelCommand)    # Create new task object (frow library actasks)           
56            self.tasks[str(id)] = newtask                 # Adding to task dictionary           
57            newtask.runTask()                           # running Task
58
59            # Prepare thread (for multicasting, it will reads task log and redirects to websocket)
60            multicast_thread = threading.Thread(target=self.multicast, args=([newtask]))
61            multicast_thread.daemon = True
62            multicast_thread.start()
63                       
64            return {"status": True, "msg":str(id)}
65        except Exception as e:
66            return {"status": False, "msg":str(e)}
67   
68    def getTasks(self):
69        '''
70        Gets info for all tasks
71        '''
72        try:
73            return {"status":True, "msg":str(self.tasks)}
74        except Exception as e:
75            return {"status": False, "msg":str(e)}
76       
77   
78    def getTask(self, taskid):
79        '''
80        Gets info for current task
81        '''
82        try:
83            return {"status":True, "msg":self.tasks[taskid].get()}
84        except Exception as e:
85            return {"status": False, "msg":str(e)}
86       
87    def getTaskStatus(self, taskid):
88        '''
89        Gets status for current task
90        '''
91        try:
92            return {"status":True, "taskStatus":self.tasks[taskid].getStatus()}
93        except Exception as e:
94            return {"status": False, "msg":str(e)}
95       
96
97    def prepareLogForDownload(self, taskid):
98        '''
99        Copies task status from /run/taskmanager/pipe_XXX corresponding to taskid to admincenter/logs
100        '''
101        try:
102            pipe=self.tasks[taskid].getFilePipe()
103            if not os.path.exists("/tmp/taskslog"):
104                os.makedirs("/tmp/taskslog")
105            shutil.copy(pipe, "/tmp/taskslog/")
106            return {"status":True, "file":ntpath.basename(pipe)}
107       
108        except Exception as e:
109            return {"status": False, "msg":str(e)}
110
111    def cancelTask(self, taskid):
112        print "REMOVING ", str(taskid)
113        return self.tasks[taskid].stop()
114
115    def listenTask(self, taskid):
116        '''
117        adds a new listener to tasks listener
118        '''
119        try:
120            print ("listening for task ", taskid)
121            print (self.tasks[taskid].get())
122            #return {"status": True, "msg":self.tasks[taskid].get()}
123        except Exception as e:
124            return {"status": False, "msg":str(e)}       
125
126    def multicast(self, task):
127        try:
128            while True:
129                while (str(task.filepipe)==""): # Waitint to have task ready
130                    time.sleep(0.5)
131               
132                print str(task.filepipe)
133                if not os.path.exists(task.filepipe):
134                    time.sleep(1)
135                else:
136                    break
137                    #return False
138            pipe = open(task.filepipe,'r')
139            pipe.seek(task.seek,0)
140            try:
141                line = pipe.readline()
142                if line:
143                    self.ws.send(str({"id": str(task.taskid), "line":line}))
144                while task.status=="RUNNING" or line:
145                    line = pipe.readline()
146                    while (line):
147                        self.ws.send(str({"id": str(task.taskid), "line":line}))
148                        line=pipe.readline()
149                        task.seek = pipe.tell()
150            except Exception as e:
151                print "[TASKMANAGER] Exception while reading pipe "+str(e)
152        except Exception as e:
153            print "[TASKMANAGER] Exception while reading pipe "+str(e)
Note: See TracBrowser for help on using the repository browser.