source: squid-ssl/trunk/fuentes/src/DiskIO/IpcIo/IpcIoFile.cc @ 5495

Last change on this file since 5495 was 5495, checked in by Juanma, 2 years ago

Initial release

File size: 31.0 KB
Line 
1/*
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 47    Store Directory Routines */
10
11#include "squid.h"
12#include "base/RunnersRegistry.h"
13#include "base/TextException.h"
14#include "disk.h"
15#include "DiskIO/IORequestor.h"
16#include "DiskIO/IpcIo/IpcIoFile.h"
17#include "DiskIO/ReadRequest.h"
18#include "DiskIO/WriteRequest.h"
19#include "fd.h"
20#include "globals.h"
21#include "ipc/mem/Pages.h"
22#include "ipc/Messages.h"
23#include "ipc/Port.h"
24#include "ipc/Queue.h"
25#include "ipc/StrandSearch.h"
26#include "ipc/UdsOp.h"
27#include "SBuf.h"
28#include "SquidConfig.h"
29#include "SquidTime.h"
30#include "StatCounters.h"
31#include "tools.h"
32
33#include <cerrno>
34
35CBDATA_CLASS_INIT(IpcIoFile);
36
37/// shared memory segment path to use for IpcIoFile maps
38static const char *const ShmLabel = "io_file";
39/// a single worker-to-disker or disker-to-worker queue capacity; up
40/// to 2*QueueCapacity I/O requests queued between a single worker and
41/// a single disker
42// TODO: make configurable or compute from squid.conf settings if possible
43static const int QueueCapacity = 1024;
44
45const double IpcIoFile::Timeout = 7; // seconds;  XXX: ALL,9 may require more
46IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
47IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
48std::unique_ptr<IpcIoFile::Queue> IpcIoFile::queue;
49
50bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false;
51
52static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
53static void DiskerClose(const SBuf &path);
54
55/// IpcIo wrapper for debugs() streams; XXX: find a better class name
56struct SipcIo {
57    SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
58        worker(aWorker), msg(aMsg), disker(aDisker) {}
59
60    int worker;
61    const IpcIoMsg &msg;
62    int disker;
63};
64
65std::ostream &
66operator <<(std::ostream &os, const SipcIo &sio)
67{
68    return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
69           (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
70}
71
72IpcIoFile::IpcIoFile(char const *aDb):
73    dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
74    olderRequests(&requestMap1), newerRequests(&requestMap2),
75    timeoutCheckScheduled(false)
76{
77}
78
79IpcIoFile::~IpcIoFile()
80{
81    if (diskId >= 0) {
82        const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId);
83        // XXX: warn and continue?
84        Must(i != IpcIoFiles.end());
85        Must(i->second == this);
86        IpcIoFiles.erase(i);
87    }
88}
89
90void
91IpcIoFile::configure(const Config &cfg)
92{
93    DiskFile::configure(cfg);
94    config = cfg;
95}
96
97void
98IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
99{
100    ioRequestor = callback;
101    Must(diskId < 0); // we do not know our disker yet
102
103    if (!queue.get())
104        queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
105
106    if (IamDiskProcess()) {
107        error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
108        if (error_)
109            return;
110
111        diskId = KidIdentifier;
112        const bool inserted =
113            IpcIoFiles.insert(std::make_pair(diskId, this)).second;
114        Must(inserted);
115
116        queue->localRateLimit() =
117            static_cast<Ipc::QueueReader::Rate::Value>(config.ioRate);
118
119        Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
120        ann.strand.tag = dbName;
121        Ipc::TypedMsgHdr message;
122        ann.pack(message);
123        SendMessage(Ipc::Port::CoordinatorAddr(), message);
124
125        ioRequestor->ioCompletedNotification();
126        return;
127    }
128
129    Ipc::StrandSearchRequest request;
130    request.requestorId = KidIdentifier;
131    request.tag = dbName;
132
133    Ipc::TypedMsgHdr msg;
134    request.pack(msg);
135    Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg);
136
137    WaitingForOpen.push_back(this);
138
139    eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
140             this, Timeout, 0, false); // "this" pointer is used as id
141}
142
143void
144IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response)
145{
146    Must(diskId < 0); // we do not know our disker yet
147
148    if (!response) {
149        debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
150               "channel establishment timeout");
151        error_ = true;
152    } else {
153        diskId = response->strand.kidId;
154        if (diskId >= 0) {
155            const bool inserted =
156                IpcIoFiles.insert(std::make_pair(diskId, this)).second;
157            Must(inserted);
158        } else {
159            error_ = true;
160            debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
161                   "responsibility for " << dbName);
162        }
163    }
164
165    ioRequestor->ioCompletedNotification();
166}
167
168/**
169 * Alias for IpcIoFile::open(...)
170 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
171 */
172void
173IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
174{
175    assert(false); // check
176    /* We use the same logic path for open */
177    open(flags, mode, callback);
178}
179
180void
181IpcIoFile::close()
182{
183    assert(ioRequestor != NULL);
184
185    if (IamDiskProcess())
186        DiskerClose(SBuf(dbName.termedBuf()));
187    // XXX: else nothing to do?
188
189    ioRequestor->closeCompleted();
190}
191
192bool
193IpcIoFile::canRead() const
194{
195    return diskId >= 0 && !error_ && canWait();
196}
197
198bool
199IpcIoFile::canWrite() const
200{
201    return diskId >= 0 && !error_ && canWait();
202}
203
204bool
205IpcIoFile::error() const
206{
207    return error_;
208}
209
210void
211IpcIoFile::read(ReadRequest *readRequest)
212{
213    debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
214           readRequest->offset << ")");
215
216    assert(ioRequestor != NULL);
217    assert(readRequest->offset >= 0);
218    Must(!error_);
219
220    //assert(minOffset < 0 || minOffset <= readRequest->offset);
221    //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
222
223    IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
224    pending->readRequest = readRequest;
225    push(pending);
226}
227
228void
229IpcIoFile::readCompleted(ReadRequest *readRequest,
230                         IpcIoMsg *const response)
231{
232    bool ioError = false;
233    if (!response) {
234        debugs(79, 3, HERE << "error: timeout");
235        ioError = true; // I/O timeout does not warrant setting error_?
236    } else {
237        if (response->xerrno) {
238            debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
239                   xstrerr(response->xerrno));
240            ioError = error_ = true;
241        } else if (!response->page) {
242            debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
243                   "out of shared memory pages");
244            ioError = true;
245        } else {
246            const char *const buf = Ipc::Mem::PagePointer(response->page);
247            memcpy(readRequest->buf, buf, response->len);
248        }
249
250        Ipc::Mem::PutPage(response->page);
251    }
252
253    const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
254    const int errflag = ioError ? DISK_ERROR :DISK_OK;
255    ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
256}
257
258void
259IpcIoFile::write(WriteRequest *writeRequest)
260{
261    debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
262           writeRequest->offset << ")");
263
264    assert(ioRequestor != NULL);
265    assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
266    assert(writeRequest->offset >= 0);
267    Must(!error_);
268
269    //assert(minOffset < 0 || minOffset <= writeRequest->offset);
270    //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
271
272    IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
273    pending->writeRequest = writeRequest;
274    push(pending);
275}
276
277void
278IpcIoFile::writeCompleted(WriteRequest *writeRequest,
279                          const IpcIoMsg *const response)
280{
281    bool ioError = false;
282    if (!response) {
283        debugs(79, 3, "disker " << diskId << " timeout");
284        ioError = true; // I/O timeout does not warrant setting error_?
285    } else if (response->xerrno) {
286        debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
287               " error writing " << writeRequest->len << " bytes at " <<
288               writeRequest->offset << ": " << xstrerr(response->xerrno) <<
289               "; this worker will stop using " << dbName);
290        ioError = error_ = true;
291    } else if (response->len != writeRequest->len) {
292        debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
293               response->len << " instead of " << writeRequest->len <<
294               " bytes (offset " << writeRequest->offset << "); " <<
295               "this worker will stop using " << dbName);
296        error_ = true;
297    }
298
299    if (writeRequest->free_func)
300        (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
301
302    if (!ioError) {
303        debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
304               diskId << " at " << writeRequest->offset);
305    }
306
307    const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
308    const int errflag = ioError ? DISK_ERROR :DISK_OK;
309    ioRequestor->writeCompleted(errflag, rlen, writeRequest);
310}
311
312bool
313IpcIoFile::ioInProgress() const
314{
315    return !olderRequests->empty() || !newerRequests->empty();
316}
317
318/// track a new pending request
319void
320IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
321{
322    const std::pair<RequestMap::iterator,bool> result =
323        newerRequests->insert(std::make_pair(id, pending));
324    Must(result.second); // failures means that id was not unique
325    if (!timeoutCheckScheduled)
326        scheduleTimeoutCheck();
327}
328
329/// push an I/O request to disker
330void
331IpcIoFile::push(IpcIoPendingRequest *const pending)
332{
333    // prevent queue overflows: check for responses to earlier requests
334    // warning: this call may result in indirect push() recursion
335    HandleResponses("before push");
336
337    debugs(47, 7, HERE);
338    Must(diskId >= 0);
339    Must(pending);
340    Must(pending->readRequest || pending->writeRequest);
341
342    IpcIoMsg ipcIo;
343    try {
344        if (++lastRequestId == 0) // don't use zero value as requestId
345            ++lastRequestId;
346        ipcIo.requestId = lastRequestId;
347        ipcIo.start = current_time;
348        if (pending->readRequest) {
349            ipcIo.command = IpcIo::cmdRead;
350            ipcIo.offset = pending->readRequest->offset;
351            ipcIo.len = pending->readRequest->len;
352        } else { // pending->writeRequest
353            Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
354            if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
355                ipcIo.len = 0;
356                throw TexcHere("run out of shared memory pages for IPC I/O");
357            }
358            ipcIo.command = IpcIo::cmdWrite;
359            ipcIo.offset = pending->writeRequest->offset;
360            ipcIo.len = pending->writeRequest->len;
361            char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
362            memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
363        }
364
365        debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
366
367        if (queue->push(diskId, ipcIo))
368            Notify(diskId); // must notify disker
369        trackPendingRequest(ipcIo.requestId, pending);
370    } catch (const Queue::Full &) {
371        debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
372               dbName << " overflow: " <<
373               SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
374        // TODO: grow queue size
375
376        pending->completeIo(NULL);
377        delete pending;
378    } catch (const TextException &e) {
379        debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
380        pending->completeIo(NULL);
381        delete pending;
382    }
383}
384
385/// whether we think there is enough time to complete the I/O
386bool
387IpcIoFile::canWait() const
388{
389    if (!config.ioTimeout)
390        return true; // no timeout specified
391
392    IpcIoMsg oldestIo;
393    if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
394        return true; // we cannot estimate expected wait time; assume it is OK
395
396    const int oldestWait = tvSubMsec(oldestIo.start, current_time);
397
398    int rateWait = -1; // time in millisecons
399    const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId);
400    if (ioRate > 0) {
401        // if there are N requests pending, the new one will wait at
402        // least N/max-swap-rate seconds
403        rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
404        // adjust N/max-swap-rate value based on the queue "balance"
405        // member, in case we have been borrowing time against future
406        // I/O already
407        rateWait += queue->balance(diskId);
408    }
409
410    const int expectedWait = max(oldestWait, rateWait);
411    if (expectedWait < 0 ||
412            static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
413        return true; // expected wait time is acceptible
414
415    debugs(47,2, HERE << "cannot wait: " << expectedWait <<
416           " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
417    return false; // do not want to wait that long
418}
419
420/// called when coordinator responds to worker open request
421void
422IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
423{
424    debugs(47, 7, HERE << "coordinator response to open request");
425    for (IpcIoFileList::iterator i = WaitingForOpen.begin();
426            i != WaitingForOpen.end(); ++i) {
427        if (response.strand.tag == (*i)->dbName) {
428            (*i)->openCompleted(&response);
429            WaitingForOpen.erase(i);
430            return;
431        }
432    }
433
434    debugs(47, 4, HERE << "LATE disker response to open for " <<
435           response.strand.tag);
436    // nothing we can do about it; completeIo() has been called already
437}
438
439void
440IpcIoFile::HandleResponses(const char *const when)
441{
442    debugs(47, 4, HERE << "popping all " << when);
443    IpcIoMsg ipcIo;
444    // get all responses we can: since we are not pushing, this will stop
445    int diskId;
446    while (queue->pop(diskId, ipcIo)) {
447        const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
448        Must(i != IpcIoFiles.end()); // TODO: warn but continue
449        i->second->handleResponse(ipcIo);
450    }
451}
452
453void
454IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
455{
456    const int requestId = ipcIo.requestId;
457    debugs(47, 7, HERE << "popped disker response: " <<
458           SipcIo(KidIdentifier, ipcIo, diskId));
459
460    Must(requestId);
461    if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
462        pending->completeIo(&ipcIo);
463        delete pending; // XXX: leaking if throwing
464    } else {
465        debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
466               "; ipcIo" << KidIdentifier << '.' << requestId);
467        // nothing we can do about it; completeIo() has been called already
468    }
469}
470
471void
472IpcIoFile::Notify(const int peerId)
473{
474    // TODO: Count and report the total number of notifications, pops, pushes.
475    debugs(47, 7, HERE << "kid" << peerId);
476    Ipc::TypedMsgHdr msg;
477    msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
478    msg.putInt(KidIdentifier);
479    const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, peerId);
480    Ipc::SendMessage(addr, msg);
481}
482
483void
484IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
485{
486    const int from = msg.getInt();
487    debugs(47, 7, HERE << "from " << from);
488    queue->clearReaderSignal(from);
489    if (IamDiskProcess())
490        DiskerHandleRequests();
491    else
492        HandleResponses("after notification");
493}
494
495/// handles open request timeout
496void
497IpcIoFile::OpenTimeout(void *const param)
498{
499    Must(param);
500    // the pointer is used for comparison only and not dereferenced
501    const IpcIoFile *const ipcIoFile =
502        reinterpret_cast<const IpcIoFile *>(param);
503    for (IpcIoFileList::iterator i = WaitingForOpen.begin();
504            i != WaitingForOpen.end(); ++i) {
505        if (*i == ipcIoFile) {
506            (*i)->openCompleted(NULL);
507            WaitingForOpen.erase(i);
508            break;
509        }
510    }
511}
512
513/// IpcIoFile::checkTimeouts wrapper
514void
515IpcIoFile::CheckTimeouts(void *const param)
516{
517    Must(param);
518    const int diskId = reinterpret_cast<uintptr_t>(param);
519    debugs(47, 7, HERE << "diskId=" << diskId);
520    const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
521    if (i != IpcIoFiles.end())
522        i->second->checkTimeouts();
523}
524
525void
526IpcIoFile::checkTimeouts()
527{
528    timeoutCheckScheduled = false;
529
530    // last chance to recover in case a notification message was lost, etc.
531    const RequestMap::size_type timeoutsBefore = olderRequests->size();
532    HandleResponses("before timeout");
533    const RequestMap::size_type timeoutsNow = olderRequests->size();
534
535    if (timeoutsBefore > timeoutsNow) { // some requests were rescued
536        // notification message lost or significantly delayed?
537        debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
538               " may be too slow or disrupted for about " <<
539               Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
540               " out of " << timeoutsBefore << " I/Os");
541    }
542
543    if (timeoutsNow) {
544        debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
545               timeoutsNow << ' ' << dbName << " I/Os after at least " <<
546               Timeout << "s timeout");
547    }
548
549    // any old request would have timed out by now
550    typedef RequestMap::const_iterator RMCI;
551    for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
552        IpcIoPendingRequest *const pending = i->second;
553
554        const unsigned int requestId = i->first;
555        debugs(47, 7, HERE << "disker timeout; ipcIo" <<
556               KidIdentifier << '.' << requestId);
557
558        pending->completeIo(NULL); // no response
559        delete pending; // XXX: leaking if throwing
560    }
561    olderRequests->clear();
562
563    swap(olderRequests, newerRequests); // switches pointers around
564    if (!olderRequests->empty() && !timeoutCheckScheduled)
565        scheduleTimeoutCheck();
566}
567
568/// prepare to check for timeouts in a little while
569void
570IpcIoFile::scheduleTimeoutCheck()
571{
572    // we check all older requests at once so some may be wait for 2*Timeout
573    eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
574             reinterpret_cast<void *>(diskId), Timeout, 0, false);
575    timeoutCheckScheduled = true;
576}
577
578/// returns and forgets the right IpcIoFile pending request
579IpcIoPendingRequest *
580IpcIoFile::dequeueRequest(const unsigned int requestId)
581{
582    Must(requestId != 0);
583
584    RequestMap *map = NULL;
585    RequestMap::iterator i = requestMap1.find(requestId);
586
587    if (i != requestMap1.end())
588        map = &requestMap1;
589    else {
590        i = requestMap2.find(requestId);
591        if (i != requestMap2.end())
592            map = &requestMap2;
593    }
594
595    if (!map) // not found in both maps
596        return NULL;
597
598    IpcIoPendingRequest *pending = i->second;
599    map->erase(i);
600    return pending;
601}
602
603int
604IpcIoFile::getFD() const
605{
606    assert(false); // not supported; TODO: remove this method from API
607    return -1;
608}
609
610/* IpcIoMsg */
611
612IpcIoMsg::IpcIoMsg():
613    requestId(0),
614    offset(0),
615    len(0),
616    command(IpcIo::cmdNone),
617    xerrno(0)
618{
619    start.tv_sec = 0;
620    start.tv_usec = 0;
621}
622
623/* IpcIoPendingRequest */
624
625IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
626    file(aFile), readRequest(NULL), writeRequest(NULL)
627{
628}
629
630void
631IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
632{
633    if (readRequest)
634        file->readCompleted(readRequest, response);
635    else if (writeRequest)
636        file->writeCompleted(writeRequest, response);
637    else {
638        Must(!response); // only timeouts are handled here
639        file->openCompleted(NULL);
640    }
641}
642
643/* XXX: disker code that should probably be moved elsewhere */
644
645static SBuf DbName; ///< full db file name
646static int TheFile = -1; ///< db file descriptor
647
648static void
649diskerRead(IpcIoMsg &ipcIo)
650{
651    if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
652        ipcIo.len = 0;
653        debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
654        return;
655    }
656
657    char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
658    const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
659    ++statCounter.syscalls.disk.reads;
660    fd_bytes(TheFile, read, FD_READ);
661
662    if (read >= 0) {
663        ipcIo.xerrno = 0;
664        const size_t len = static_cast<size_t>(read); // safe because read > 0
665        debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
666               (len == ipcIo.len ? "all " : "just ") << read);
667        ipcIo.len = len;
668    } else {
669        ipcIo.xerrno = errno;
670        ipcIo.len = 0;
671        debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
672               ipcIo.xerrno);
673    }
674}
675
676/// Tries to write buffer to disk (a few times if needed);
677/// sets ipcIo results, but does no cleanup. The caller must cleanup.
678static void
679diskerWriteAttempts(IpcIoMsg &ipcIo)
680{
681    const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
682    size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
683    size_t wroteSoFar = 0;
684    off_t offset = ipcIo.offset;
685    // Partial writes to disk do happen. It is unlikely that the caller can
686    // handle partial writes by doing something other than writing leftovers
687    // again, so we try to write them ourselves to minimize overheads.
688    const int attemptLimit = 10;
689    for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
690        const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
691        ++statCounter.syscalls.disk.writes;
692        fd_bytes(TheFile, result, FD_WRITE);
693
694        if (result < 0) {
695            ipcIo.xerrno = errno;
696            assert(ipcIo.xerrno);
697            debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
698                   " writing " << toWrite << '/' << ipcIo.len <<
699                   " at " << ipcIo.offset << '+' << wroteSoFar <<
700                   " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
701            ipcIo.len = wroteSoFar;
702            return; // bail on error
703        }
704
705        const size_t wroteNow = static_cast<size_t>(result); // result >= 0
706        ipcIo.xerrno = 0;
707
708        debugs(47,3, "disker" << KidIdentifier << " wrote " <<
709               (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
710               " out of " << toWrite << '/' << ipcIo.len << " at " <<
711               ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
712               " try");
713
714        wroteSoFar += wroteNow;
715
716        if (wroteNow >= toWrite) {
717            ipcIo.xerrno = 0;
718            ipcIo.len = wroteSoFar;
719            return; // wrote everything there was to write
720        }
721
722        buf += wroteNow;
723        offset += wroteNow;
724        toWrite -= wroteNow;
725    }
726
727    debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
728           attemptLimit << " attempts while writing " <<
729           toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
730           wroteSoFar);
731    return; // not a fatal I/O error, unless the caller treats it as such
732}
733
734static void
735diskerWrite(IpcIoMsg &ipcIo)
736{
737    diskerWriteAttempts(ipcIo); // may fail
738    Ipc::Mem::PutPage(ipcIo.page);
739}
740
741void
742IpcIoFile::DiskerHandleMoreRequests(void *source)
743{
744    debugs(47, 7, HERE << "resuming handling requests after " <<
745           static_cast<const char *>(source));
746    DiskerHandleMoreRequestsScheduled = false;
747    IpcIoFile::DiskerHandleRequests();
748}
749
750bool
751IpcIoFile::WaitBeforePop()
752{
753    const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit();
754    const double maxRate = ioRate/1e3; // req/ms
755
756    // do we need to enforce configured I/O rate?
757    if (maxRate <= 0)
758        return false;
759
760    // is there an I/O request we could potentially delay?
761    int processId;
762    IpcIoMsg ipcIo;
763    if (!queue->peek(processId, ipcIo)) {
764        // unlike pop(), peek() is not reliable and does not block reader
765        // so we must proceed with pop() even if it is likely to fail
766        return false;
767    }
768
769    static timeval LastIo = current_time;
770
771    const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
772    // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
773    const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
774
775    const double credit = ioDuration; // what the last I/O should have cost us
776    const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
777    LastIo = current_time;
778
779    Ipc::QueueReader::Balance &balance = queue->localBalance();
780    balance += static_cast<int64_t>(credit - debit);
781
782    debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
783
784    if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
785        // if the next request is (likely) write and we accumulated
786        // too much time for future slow I/Os, then shed accumulated
787        // time to keep just half of the excess
788        const int64_t toSpend = balance - maxImbalance/2;
789
790        if (toSpend/1e3 > Timeout)
791            debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
792                   "I/O requests for " << (toSpend/1e3) << " seconds " <<
793                   "to obey " << ioRate << "/sec rate limit");
794
795        debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
796               (1e3*maxRate) << "/sec rate");
797        eventAdd("IpcIoFile::DiskerHandleMoreRequests",
798                 &IpcIoFile::DiskerHandleMoreRequests,
799                 const_cast<char*>("rate limiting"),
800                 toSpend/1e3, 0, false);
801        DiskerHandleMoreRequestsScheduled = true;
802        return true;
803    } else if (balance < -maxImbalance) {
804        // do not owe "too much" to avoid "too large" bursts of I/O
805        balance = -maxImbalance;
806    }
807
808    return false;
809}
810
811void
812IpcIoFile::DiskerHandleRequests()
813{
814    // Balance our desire to maximize the number of concurrent I/O requests
815    // (reordred by OS to minimize seek time) with a requirement to
816    // send 1st-I/O notification messages, process Coordinator events, etc.
817    const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
818    const timeval loopStart = current_time;
819
820    int popped = 0;
821    int workerId = 0;
822    IpcIoMsg ipcIo;
823    while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
824        ++popped;
825
826        // at least one I/O per call is guaranteed if the queue is not empty
827        DiskerHandleRequest(workerId, ipcIo);
828
829        getCurrentTime();
830        const double elapsedMsec = tvSubMsec(loopStart, current_time);
831        if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
832            if (!DiskerHandleMoreRequestsScheduled) {
833                // the gap must be positive for select(2) to be given a chance
834                const double minBreakSecs = 0.001;
835                eventAdd("IpcIoFile::DiskerHandleMoreRequests",
836                         &IpcIoFile::DiskerHandleMoreRequests,
837                         const_cast<char*>("long I/O loop"),
838                         minBreakSecs, 0, false);
839                DiskerHandleMoreRequestsScheduled = true;
840            }
841            debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
842                   elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
843            break;
844        }
845    }
846
847    // TODO: consider using O_DIRECT with "elevator" optimization where we pop
848    // requests first, then reorder the popped requests to optimize seek time,
849    // then do I/O, then take a break, and come back for the next set of I/O
850    // requests.
851}
852
853/// called when disker receives an I/O request
854void
855IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
856{
857    if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
858        debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
859               " should not receive " << ipcIo.command <<
860               " ipcIo" << workerId << '.' << ipcIo.requestId);
861        return;
862    }
863
864    debugs(47,5, HERE << "disker" << KidIdentifier <<
865           (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
866           ipcIo.len << " at " << ipcIo.offset <<
867           " ipcIo" << workerId << '.' << ipcIo.requestId);
868
869    if (ipcIo.command == IpcIo::cmdRead)
870        diskerRead(ipcIo);
871    else // ipcIo.command == IpcIo::cmdWrite
872        diskerWrite(ipcIo);
873
874    debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
875
876    try {
877        if (queue->push(workerId, ipcIo))
878            Notify(workerId); // must notify worker
879    } catch (const Queue::Full &) {
880        // The worker queue should not overflow because the worker should pop()
881        // before push()ing and because if disker pops N requests at a time,
882        // we should make sure the worker pop() queue length is the worker
883        // push queue length plus N+1. XXX: implement the N+1 difference.
884        debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue for " <<
885               DbName << " overflow: " <<
886               SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
887
888        // the I/O request we could not push will timeout
889    }
890}
891
892static bool
893DiskerOpen(const SBuf &path, int flags, mode_t mode)
894{
895    assert(TheFile < 0);
896
897    DbName = path;
898    TheFile = file_open(DbName.c_str(), flags);
899
900    if (TheFile < 0) {
901        const int xerrno = errno;
902        debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
903               xstrerr(xerrno));
904        return false;
905    }
906
907    ++store_open_disk_fd;
908    debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
909    return true;
910}
911
912static void
913DiskerClose(const SBuf &path)
914{
915    if (TheFile >= 0) {
916        file_close(TheFile);
917        debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
918        TheFile = -1;
919        --store_open_disk_fd;
920    }
921    DbName.clear();
922}
923
924/// reports our needs for shared memory pages to Ipc::Mem::Pages
925/// and initializes shared memory segments used by IpcIoFile
926class IpcIoRr: public Ipc::Mem::RegisteredRunner
927{
928public:
929    /* RegisteredRunner API */
930    IpcIoRr(): owner(NULL) {}
931    virtual ~IpcIoRr();
932    virtual void claimMemoryNeeds();
933
934protected:
935    /* Ipc::Mem::RegisteredRunner API */
936    virtual void create();
937
938private:
939    Ipc::FewToFewBiQueue::Owner *owner;
940};
941
942RunnerRegistrationEntry(IpcIoRr);
943
944void
945IpcIoRr::claimMemoryNeeds()
946{
947    const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
948                               ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity);
949    // the maximum number of shared I/O pages is approximately the
950    // number of queue slots, we add a fudge factor to that to account
951    // for corner cases where I/O pages are created before queue
952    // limits are checked or destroyed long after the I/O is dequeued
953    Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage,
954                           static_cast<int>(itemsCount * 1.1));
955}
956
957void
958IpcIoRr::create()
959{
960    if (Config.cacheSwap.n_strands <= 0)
961        return;
962
963    Must(!owner);
964    owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1,
965                                       Config.cacheSwap.n_strands,
966                                       1 + Config.workers, sizeof(IpcIoMsg),
967                                       QueueCapacity);
968}
969
970IpcIoRr::~IpcIoRr()
971{
972    delete owner;
973}
974
Note: See TracBrowser for help on using the repository browser.