source: filezilla/trunk/fuentes/src/engine/iothread.cpp @ 130

Last change on this file since 130 was 130, checked in by jrpelegrina, 4 years ago

First release to xenial

File size: 6.7 KB
Line 
1#include <filezilla.h>
2
3#include "file.h"
4#include "iothread.h"
5
6#include <wx/log.h>
7
8CIOThread::CIOThread()
9        : wxThread(wxTHREAD_JOINABLE)
10{
11        m_buffers[0] = new char[BUFFERSIZE*BUFFERCOUNT];
12        for (unsigned int i = 0; i < BUFFERCOUNT; ++i) {
13                m_buffers[i] = m_buffers[0] + BUFFERSIZE * i;
14                m_bufferLens[i] = 0;
15        }
16}
17
18CIOThread::~CIOThread()
19{
20        Close();
21
22        delete [] m_buffers[0];
23}
24
25void CIOThread::Close()
26{
27        if (m_pFile) {
28                // The file might have been preallocated and the transfer stopped before being completed
29                // so always truncate the file to the actually written size before closing it.
30                if (!m_read)
31                        m_pFile->Truncate();
32
33                m_pFile.reset();
34        }
35}
36
37bool CIOThread::Create(std::unique_ptr<CFile> && pFile, bool read, bool binary)
38{
39        wxASSERT(pFile);
40
41        Close();
42
43        m_pFile = std::move(pFile);
44        m_read = read;
45        m_binary = binary;
46
47        if (read) {
48                m_curAppBuf = BUFFERCOUNT - 1;
49                m_curThreadBuf = 0;
50        }
51        else {
52                m_curAppBuf = -1;
53                m_curThreadBuf = 0;
54        }
55
56#ifdef SIMULATE_IO
57        size_ = m_pFile->Length();
58#endif
59
60        m_running = true;
61        wxThread::Create();
62        wxThread::Run();
63
64        return true;
65}
66
67wxThread::ExitCode CIOThread::Entry()
68{
69        if (m_read) {
70                while (m_running) {
71                        int len = ReadFromFile(m_buffers[m_curThreadBuf], BUFFERSIZE);
72
73                        scoped_lock l(m_mutex);
74
75                        if (m_appWaiting) {
76                                if (!m_evtHandler) {
77                                        m_running = false;
78                                        break;
79                                }
80                                m_appWaiting = false;
81                                m_evtHandler->SendEvent<CIOThreadEvent>();
82                        }
83
84                        if (len == wxInvalidOffset) {
85                                m_error = true;
86                                m_running = false;
87                                break;
88                        }
89
90                        m_bufferLens[m_curThreadBuf] = len;
91
92                        if (!len) {
93                                m_running = false;
94                                break;
95                        }
96
97                        ++m_curThreadBuf %= BUFFERCOUNT;
98                        if (m_curThreadBuf == m_curAppBuf) {
99                                if (!m_running)
100                                        break;
101
102                                m_threadWaiting = true;
103                                if (m_running)
104                                        m_condition.wait(l);
105                        }
106                }
107        }
108        else {
109                scoped_lock l(m_mutex);
110                while (m_curAppBuf == -1) {
111                        if (!m_running) {
112                                return 0;
113                        }
114                        else {
115                                m_threadWaiting = true;
116                                m_condition.wait(l);
117                        }
118                }
119
120                for (;;) {
121                        while (m_curThreadBuf == m_curAppBuf) {
122                                if (!m_running) {
123                                        return 0;
124                                }
125                                m_threadWaiting = true;
126                                m_condition.wait(l);
127                        }
128
129                        l.unlock();
130                        bool writeSuccessful = WriteToFile(m_buffers[m_curThreadBuf], BUFFERSIZE);
131                        l.lock();
132
133                        if (!writeSuccessful) {
134                                m_error = true;
135                                m_running = false;
136                        }
137
138                        if (m_appWaiting) {
139                                if (!m_evtHandler) {
140                                        m_running = false;
141                                        break;
142                                }
143                                m_appWaiting = false;
144                                m_evtHandler->SendEvent<CIOThreadEvent>();
145                        }
146
147                        if (m_error)
148                                break;
149
150                        ++m_curThreadBuf %= BUFFERCOUNT;
151                }
152        }
153
154        return 0;
155}
156
157int CIOThread::GetNextWriteBuffer(char** pBuffer)
158{
159        wxASSERT(!m_destroyed);
160
161        scoped_lock l(m_mutex);
162
163        if (m_error)
164                return IO_Error;
165
166        if (m_curAppBuf == -1) {
167                m_curAppBuf = 0;
168                *pBuffer = m_buffers[0];
169                return IO_Success;
170        }
171
172        int newBuf = (m_curAppBuf + 1) % BUFFERCOUNT;
173        if (newBuf == m_curThreadBuf) {
174                m_appWaiting = true;
175                return IO_Again;
176        }
177
178        if (m_threadWaiting) {
179                m_condition.signal(l);
180                m_threadWaiting = false;
181        }
182
183        m_curAppBuf = newBuf;
184        *pBuffer = m_buffers[newBuf];
185
186        return IO_Success;
187}
188
189bool CIOThread::Finalize(int len)
190{
191        wxASSERT(m_pFile);
192
193        if (m_destroyed)
194                return true;
195
196        Destroy();
197
198        if (m_curAppBuf == -1)
199                return true;
200
201        if (m_error)
202                return false;
203
204        if (!len)
205                return true;
206
207        if (!WriteToFile(m_buffers[m_curAppBuf], len))
208                return false;
209
210#ifndef __WXMSW__
211        if (!m_binary && m_wasCarriageReturn) {
212                const char CR = '\r';
213                if (m_pFile->Write(&CR, 1) != 1)
214                        return false;
215        }
216#endif
217        return true;
218}
219
220int CIOThread::GetNextReadBuffer(char** pBuffer)
221{
222        wxASSERT(!m_destroyed);
223        wxASSERT(m_read);
224
225        int newBuf = (m_curAppBuf + 1) % BUFFERCOUNT;
226
227        scoped_lock l(m_mutex);
228
229        if (newBuf == m_curThreadBuf) {
230                if (m_error)
231                        return IO_Error;
232                else if (!m_running)
233                        return IO_Success;
234                else {
235                        m_appWaiting = true;
236                        return IO_Again;
237                }
238        }
239
240        if (m_threadWaiting) {
241                m_condition.signal(l);
242                m_threadWaiting = false;
243        }
244
245        *pBuffer = m_buffers[newBuf];
246        m_curAppBuf = newBuf;
247
248        return m_bufferLens[newBuf];
249}
250
251void CIOThread::Destroy()
252{
253        if (m_destroyed)
254                return;
255        m_destroyed = true;
256
257        scoped_lock l(m_mutex);
258
259        m_running = false;
260        if (m_threadWaiting) {
261                m_threadWaiting = false;
262                m_condition.signal(l);
263        }
264        l.unlock();
265
266        Wait(wxTHREAD_WAIT_BLOCK);
267}
268
269int CIOThread::ReadFromFile(char* pBuffer, int maxLen)
270{
271#ifdef SIMULATE_IO
272        if (size_ < 0) {
273                return 0;
274        }
275        size_ -= maxLen;
276        return maxLen;
277#endif
278
279        // In binary mode, no conversion has to be done.
280        // Also, under Windows the native newline format is already identical
281        // to the newline format of the FTP protocol
282#ifndef __WXMSW__
283        if (m_binary)
284#endif
285                return m_pFile->Read(pBuffer, maxLen);
286
287#ifndef __WXMSW__
288
289        // In the worst case, length will doubled: If reading
290        // only LFs from the file
291        const int readLen = maxLen / 2;
292
293        char* r = pBuffer + readLen;
294        int len = m_pFile->Read(r, readLen);
295        if (!len || len == wxInvalidOffset)
296                return len;
297
298        const char* const end = r + len;
299        char* w = pBuffer;
300
301        // Convert all stand-alone LFs into CRLF pairs.
302        while (r != end) {
303                char c = *r++;
304                if (c == '\n') {
305                        if (!m_wasCarriageReturn)
306                                *w++ = '\r';
307                        m_wasCarriageReturn = false;
308                }
309                else if (c == '\r')
310                        m_wasCarriageReturn = true;
311                else
312                        m_wasCarriageReturn = false;
313
314                *w++ = c;
315        }
316
317        return w - pBuffer;
318#endif
319}
320
321bool CIOThread::WriteToFile(char* pBuffer, int len)
322{
323#ifdef SIMULATE_IO
324        return true;
325#endif
326        // In binary mode, no conversion has to be done.
327        // Also, under Windows the native newline format is already identical
328        // to the newline format of the FTP protocol
329#ifndef __WXMSW__
330        if (m_binary) {
331#endif
332                return DoWrite(pBuffer, len);
333#ifndef __WXMSW__
334        }
335        else {
336                // On all CRLF pairs, omit the CR. Don't harm stand-alone CRs
337                // I assume disk access is buffered, otherwise the 1 byte writes are
338                // going to hurt performance.
339                const char CR = '\r';
340                const char* const end = pBuffer + len;
341                for (char* r = pBuffer; r != end; ++r) {
342                        char c = *r;
343                        if (c == '\r')
344                                m_wasCarriageReturn = true;
345                        else if (c == '\n') {
346                                m_wasCarriageReturn = false;
347                                if (!DoWrite(&c, 1))
348                                        return false;
349                        }
350                        else {
351                                if (m_wasCarriageReturn) {
352                                        m_wasCarriageReturn = false;
353                                        if (!DoWrite(&CR, 1))
354                                                return false;
355                                }
356
357                                if (!DoWrite(&c, 1))
358                                        return false;
359                        }
360                }
361                return true;
362        }
363#endif
364}
365
366bool CIOThread::DoWrite(const char* pBuffer, int len)
367{
368        int written = m_pFile->Write(pBuffer, len);
369        if (written == len) {
370                return true;
371        }
372
373        int code = wxSysErrorCode();
374
375        const wxString error = wxSysErrorMsg(code);
376
377        scoped_lock locker(m_mutex);
378        m_error_description = error;
379
380        return false;
381}
382
383wxString CIOThread::GetError()
384{
385        scoped_lock locker(m_mutex);
386        return m_error_description;
387}
388
389void CIOThread::SetEventHandler(CEventHandler* handler)
390{
391        scoped_lock locker(m_mutex);
392        m_evtHandler = handler;
393}
Note: See TracBrowser for help on using the repository browser.