source: filezilla/trunk/fuentes/src/engine/iothread.cpp @ 3185

Last change on this file since 3185 was 3185, checked in by jrpelegrina, 3 years ago

Update new version: 3.15.02

File size: 6.9 KB
Line 
1#include <filezilla.h>
2
3#include "iothread.h"
4
5#include <libfilezilla/file.hpp>
6
7#include <wx/log.h>
8
9CIOThread::CIOThread()
10{
11        m_buffers[0] = new char[BUFFERSIZE*BUFFERCOUNT];
12        for (unsigned int i = 0; i < BUFFERCOUNT; ++i) {
13                m_buffers[i] = m_buffers[0] + BUFFERSIZE * i;
14                m_bufferLens[i] = 0;
15        }
16}
17
18CIOThread::~CIOThread()
19{
20        Close();
21
22        delete [] m_buffers[0];
23}
24
25void CIOThread::Close()
26{
27        if (m_pFile) {
28                // The file might have been preallocated and the transfer stopped before being completed
29                // so always truncate the file to the actually written size before closing it.
30                if (!m_read)
31                        m_pFile->truncate();
32
33                m_pFile.reset();
34        }
35}
36
37bool CIOThread::Create(std::unique_ptr<fz::file> && pFile, bool read, bool binary)
38{
39        wxASSERT(pFile);
40
41        Close();
42
43        m_pFile = std::move(pFile);
44        m_read = read;
45        m_binary = binary;
46
47        if (read) {
48                m_curAppBuf = BUFFERCOUNT - 1;
49                m_curThreadBuf = 0;
50        }
51        else {
52                m_curAppBuf = -1;
53                m_curThreadBuf = 0;
54        }
55
56#ifdef SIMULATE_IO
57        size_ = m_pFile->size();
58#endif
59       
60        m_running = true;
61       
62        if (!run()) {
63                m_running = false;
64                return false;
65        }
66       
67        return true;
68}
69
70void CIOThread::entry()
71{
72        if (m_read) {
73                while (m_running) {
74                        auto len = ReadFromFile(m_buffers[m_curThreadBuf], BUFFERSIZE);
75
76                        fz::scoped_lock l(m_mutex);
77
78                        if (m_appWaiting) {
79                                if (!m_evtHandler) {
80                                        m_running = false;
81                                        break;
82                                }
83                                m_appWaiting = false;
84                                m_evtHandler->send_event<CIOThreadEvent>();
85                        }
86
87                        if (len == -1) {
88                                m_error = true;
89                                m_running = false;
90                                break;
91                        }
92
93                        m_bufferLens[m_curThreadBuf] = len;
94
95                        if (!len) {
96                                m_running = false;
97                                break;
98                        }
99
100                        ++m_curThreadBuf %= BUFFERCOUNT;
101                        if (m_curThreadBuf == m_curAppBuf) {
102                                if (!m_running)
103                                        break;
104
105                                m_threadWaiting = true;
106                                if (m_running)
107                                        m_condition.wait(l);
108                        }
109                }
110        }
111        else {
112                fz::scoped_lock l(m_mutex);
113                while (m_curAppBuf == -1) {
114                        if (!m_running) {
115                                return;
116                        }
117                        else {
118                                m_threadWaiting = true;
119                                m_condition.wait(l);
120                        }
121                }
122
123                for (;;) {
124                        while (m_curThreadBuf == m_curAppBuf) {
125                                if (!m_running) {
126                                        return;
127                                }
128                                m_threadWaiting = true;
129                                m_condition.wait(l);
130                        }
131
132                        l.unlock();
133                        bool writeSuccessful = WriteToFile(m_buffers[m_curThreadBuf], BUFFERSIZE);
134                        l.lock();
135
136                        if (!writeSuccessful) {
137                                m_error = true;
138                                m_running = false;
139                        }
140
141                        if (m_appWaiting) {
142                                if (!m_evtHandler) {
143                                        m_running = false;
144                                        break;
145                                }
146                                m_appWaiting = false;
147                                m_evtHandler->send_event<CIOThreadEvent>();
148                        }
149
150                        if (m_error)
151                                break;
152
153                        ++m_curThreadBuf %= BUFFERCOUNT;
154                }
155        }
156}
157
158int CIOThread::GetNextWriteBuffer(char** pBuffer)
159{
160        fz::scoped_lock l(m_mutex);
161
162        if (m_error)
163                return IO_Error;
164
165        if (m_curAppBuf == -1) {
166                m_curAppBuf = 0;
167                *pBuffer = m_buffers[0];
168                return IO_Success;
169        }
170
171        int newBuf = (m_curAppBuf + 1) % BUFFERCOUNT;
172        if (newBuf == m_curThreadBuf) {
173                m_appWaiting = true;
174                return IO_Again;
175        }
176
177        if (m_threadWaiting) {
178                m_condition.signal(l);
179                m_threadWaiting = false;
180        }
181
182        m_curAppBuf = newBuf;
183        *pBuffer = m_buffers[newBuf];
184
185        return IO_Success;
186}
187
188bool CIOThread::Finalize(int len)
189{
190        wxASSERT(m_pFile);
191
192        Destroy();
193
194        if (m_curAppBuf == -1)
195                return true;
196
197        if (m_error)
198                return false;
199
200        if (!len)
201                return true;
202
203        if (!WriteToFile(m_buffers[m_curAppBuf], len))
204                return false;
205
206#ifndef __WXMSW__
207        if (!m_binary && m_wasCarriageReturn) {
208                const char CR = '\r';
209                if (m_pFile->write(&CR, 1) != 1)
210                        return false;
211        }
212#endif
213
214        m_curAppBuf = -1;
215
216        return true;
217}
218
219int CIOThread::GetNextReadBuffer(char** pBuffer)
220{
221        wxASSERT(m_read);
222
223        int newBuf = (m_curAppBuf + 1) % BUFFERCOUNT;
224
225        fz::scoped_lock l(m_mutex);
226
227        if (newBuf == m_curThreadBuf) {
228                if (m_error)
229                        return IO_Error;
230                else if (!m_running)
231                        return IO_Success;
232                else {
233                        m_appWaiting = true;
234                        return IO_Again;
235                }
236        }
237
238        if (m_threadWaiting) {
239                m_condition.signal(l);
240                m_threadWaiting = false;
241        }
242
243        *pBuffer = m_buffers[newBuf];
244        m_curAppBuf = newBuf;
245
246        return m_bufferLens[newBuf];
247}
248
249void CIOThread::Destroy()
250{
251        {
252                fz::scoped_lock l(m_mutex);
253                if (m_running) {
254                        m_running = false;
255                        if (m_threadWaiting) {
256                                m_threadWaiting = false;
257                                m_condition.signal(l);
258                        }
259                }
260        }
261
262        join();
263}
264
265int64_t CIOThread::ReadFromFile(char* pBuffer, int64_t maxLen)
266{
267#ifdef SIMULATE_IO
268        if (size_ < 0) {
269                return 0;
270        }
271        size_ -= maxLen;
272        return maxLen;
273#endif
274
275        // In binary mode, no conversion has to be done.
276        // Also, under Windows the native newline format is already identical
277        // to the newline format of the FTP protocol
278#ifndef __WXMSW__
279        if (m_binary)
280#endif
281                return m_pFile->read(pBuffer, maxLen);
282
283#ifndef __WXMSW__
284
285        // In the worst case, length will doubled: If reading
286        // only LFs from the file
287        const int readLen = maxLen / 2;
288
289        char* r = pBuffer + readLen;
290        auto len = m_pFile->read(r, readLen);
291        if (!len || len <= -1)
292                return len;
293
294        const char* const end = r + len;
295        char* w = pBuffer;
296
297        // Convert all stand-alone LFs into CRLF pairs.
298        while (r != end) {
299                char c = *r++;
300                if (c == '\n') {
301                        if (!m_wasCarriageReturn)
302                                *w++ = '\r';
303                        m_wasCarriageReturn = false;
304                }
305                else if (c == '\r')
306                        m_wasCarriageReturn = true;
307                else
308                        m_wasCarriageReturn = false;
309
310                *w++ = c;
311        }
312
313        return w - pBuffer;
314#endif
315}
316
317bool CIOThread::WriteToFile(char* pBuffer, int64_t len)
318{
319#ifdef SIMULATE_IO
320        return true;
321#endif
322        // In binary mode, no conversion has to be done.
323        // Also, under Windows the native newline format is already identical
324        // to the newline format of the FTP protocol
325#ifndef __WXMSW__
326        if (m_binary) {
327#endif
328                return DoWrite(pBuffer, len);
329#ifndef __WXMSW__
330        }
331        else {
332
333                // On all CRLF pairs, omit the CR. Don't harm stand-alone CRs
334
335                // Handle trailing CR from last write
336                if (m_wasCarriageReturn && len && *pBuffer != '\n' && *pBuffer != '\r') {
337                        m_wasCarriageReturn = false;
338                        const char CR = '\r';
339                        if (!DoWrite(&CR, 1))
340                                return false;
341                }
342
343                // Skip forward to end of buffer or first CR
344                const char* r = pBuffer;
345                const char* const end = pBuffer + len;
346                while (r != end && *r != '\r') {
347                        ++r;
348                }
349
350                if (r != end) {
351                        // Now we gotta move data and also handle additional CRs.
352                        m_wasCarriageReturn = true;
353
354                        char* w = const_cast<char*>(r++);
355                        for (; r != end; ++r) {
356                                if (*r == '\r') {
357                                        m_wasCarriageReturn = true;
358                                }
359                                else if (*r == '\n') {
360                                        m_wasCarriageReturn = false;
361                                        *(w++) = *r;
362                                }
363                                else {
364                                        if (m_wasCarriageReturn) {
365                                                m_wasCarriageReturn = false;
366                                                *(w++) = '\r';
367                                        }
368                                        *(w++) = *r;
369                                }
370                        }
371                        len = w - pBuffer;
372                }
373                return DoWrite(pBuffer, len);
374        }
375#endif
376}
377
378bool CIOThread::DoWrite(const char* pBuffer, int64_t len)
379{
380        auto written = m_pFile->write(pBuffer, len);
381        if (written == len) {
382                return true;
383        }
384
385        int code = wxSysErrorCode();
386
387        const wxString error = wxSysErrorMsg(code);
388
389        fz::scoped_lock locker(m_mutex);
390        m_error_description = error;
391
392        return false;
393}
394
395wxString CIOThread::GetError()
396{
397        fz::scoped_lock locker(m_mutex);
398        return m_error_description;
399}
400
401void CIOThread::SetEventHandler(fz::event_handler* handler)
402{
403        fz::scoped_lock locker(m_mutex);
404        m_evtHandler = handler;
405}
Note: See TracBrowser for help on using the repository browser.