source: epoptes/trunk/fuentes/epoptes/daemon/bashplex.py @ 295

Last change on this file since 295 was 295, checked in by mabarracus, 3 years ago

copy trusty epoptes code

File size: 6.8 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4###########################################################################
5# BASH plex.
6#
7# Copyright (C) 2010 Fotis Tsamis <ftsamis@gmail.com>
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FINESS FOR A PARTICULAR PURPOSE.  See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program.  If not, see <http://www.gnu.org/licenses/>.
21#
22# On Debian GNU/Linux systems, the complete text of the GNU General
23# Public License can be found in `/usr/share/common-licenses/GPL".
24###########################################################################
25
26import os
27import uuid
28
29from twisted.internet import reactor, protocol, defer, error
30
31try:
32    from cStringIO import StringIO
33except ImportError:
34    from StringIO import StringIO
35
36import exchange
37
38
39class DelimitedBashReceiver(protocol.Protocol):
40    """
41    Send bash commands followed by "\necho <delimiter>",
42    buffer responses until the given delimiter is found.
43    """
44
45    # Template into which per-command delimiters are
46    # inserted. Should include trailing newline.
47    delimiterTemplate = "__delimiter__%s__\n"
48
49    def __init__(self):
50        self.currentDelimiters = []
51        self.buffer = StringIO()
52        self.pingTimer = None
53        self.pingTimeout = None
54        self.connectionLostCalled = False
55
56
57    def getDelimiter(self):
58        """
59        Generate a per-command delimiter. We could
60        use a sequence number here instead, but I'm
61        not sure we could do any more useful error
62        handling with it.
63        """
64        return str(uuid.uuid4())
65
66
67    def command(self, cmd, delimiter=None):
68        """
69        Send a command. We don't do any state-checking
70        here. If another command is in progress, it'll
71        probably be okay, but if serialization is required,
72        it should be done elsewhere.
73        """
74
75        if delimiter is None:
76            delimiter = self.delimiterTemplate % self.getDelimiter()
77
78        d = defer.Deferred()
79
80        self.currentDelimiters.append((delimiter, d))
81
82        delimitedCommand = "%s\necho %s" % (
83            cmd, delimiter)
84
85        # TODO: check the python's debug logging implementation
86        # print "Sending:", delimitedCommand,
87        self.transport.write(delimitedCommand)
88
89        return d
90
91
92    def connectionMade(self):
93        peer = self.transport.getPeer()
94       
95        self.handle = u"%s:%s" % (peer.host, peer.port)
96        d = self.command(self.factory.startupCommands.encode("utf-8"))
97       
98        def forwardConnection(result):
99            exchange.clientConnected(self.handle, self)
100            self.pingTimer = reactor.callLater(self.factory.pingInterval, self.ping)
101       
102        def killConnection(error):
103            print "Error: Could not send the startup functions to the client:", error
104            self._loseConnection()
105       
106        d.addCallback(forwardConnection)
107        d.addErrback(killConnection)
108
109
110    def connectionLost(self, reason):
111        # May be called twice, from pingTimedOut() and from loseConnection()
112        if self.connectionLostCalled:
113            return
114        else:
115            self.connectionLostCalled = True
116
117        try: self.pingTimeout.cancel()
118        except Exception: pass
119
120        try: self.pingTimer.cancel()
121        except Exception: pass
122
123        if self.handle in exchange.knownClients:
124            exchange.clientDisconnected(self.handle)
125
126
127    def dataReceived(self, data):
128        self.buffer.seek(0, os.SEEK_END)
129        self.buffer.write(data)
130
131        if not self.currentDelimiters:
132            return
133
134        (delimiter, d) = self.currentDelimiters[0]
135        # TODO: print "Searching for delimiter:", delimiter
136
137        # Optimize for large buffers by not searching the whole thing every time
138        searchLength = len(data) + len(delimiter)
139
140        self.buffer.seek(-searchLength, os.SEEK_END)
141
142        searchStr = self.buffer.read()
143        searchPos = searchStr.find(delimiter)
144        if searchPos != -1:
145            # TODO: print "Found delimiter:", delimiter
146
147            # Two steps here is correct! If the delimiter was received in the
148            # first packet, then the searchLength is greater than the buffer
149            # length, and doing this seek in one step gives the wrong answer.
150            self.buffer.seek(-searchLength, os.SEEK_END)
151            self.buffer.seek(searchPos, os.SEEK_CUR)
152            pos = self.buffer.tell()
153
154            self.buffer.seek(0)
155            response = self.buffer.read(pos)
156
157            # Throw away the delimiter
158            self.buffer.read(len(delimiter))
159
160            newBuffer = StringIO()
161            newBuffer.write(self.buffer.read())
162            self.buffer = newBuffer
163
164            self.currentDelimiters.pop(0)
165            d.callback(response)
166
167            self.checkForFurtherResponses()
168
169
170    def checkForFurtherResponses(self):
171        # See if there are more responses in the buffer.
172        # The theory here is that if we got one already, we have less than one
173        # packet of data left in the buffer, so reading it all isn't a big deal
174
175        self.buffer.seek(0)
176        rest = self.buffer.read()
177
178        while self.currentDelimiters:
179            (delimiter, d) = self.currentDelimiters[0]
180            try:
181                response, rest = rest.split(delimiter)
182            except ValueError:
183                break
184                       
185            self.currentDelimiters.pop(0)
186            d.callback(response)
187               
188        newBuffer = StringIO()
189        newBuffer.write(rest)
190        self.buffer = newBuffer
191
192
193    def ping(self):
194        self.command('ping').addCallback(self.pingResponse)
195        self.pingTimeout = reactor.callLater(self.factory.pingTimeout, 
196                                             self.pingTimedOut)
197
198    def pingResponse(self, _):
199        # In 10 secs timeouts occur, so ignore responses that arrive later
200        if self.connectionLostCalled:
201            return
202        self.pingTimeout.cancel()
203        self.pingTimer = reactor.callLater(self.factory.pingInterval, self.ping)
204
205
206    def pingTimedOut(self):
207        print "Ping timeout!"
208        self.transport.loseConnection()
209        # loseConnection() may take time, inform the GUI now!
210        self.connectionLost(error.TimeoutError())
211
212
213class DelimitedBashReceiverFactory(protocol.ServerFactory):
214    protocol = DelimitedBashReceiver
215   
216    pingInterval = 10
217    pingTimeout = 10
218   
219    startupCommands = ''
Note: See TracBrowser for help on using the repository browser.