source: squid-ssl/trunk/fuentes/src/DiskIO/DiskDaemon/DiskdIOStrategy.cc @ 5495

Last change on this file since 5495 was 5495, checked in by Juanma, 2 years ago

Initial release

File size: 13.2 KB
Line 
1/*
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 79    Squid-side DISKD I/O functions. */
10
11#include "squid.h"
12#include "comm/Loops.h"
13#include "ConfigOption.h"
14#include "diomsg.h"
15#include "DiskdFile.h"
16#include "DiskdIOStrategy.h"
17#include "DiskIO/DiskFile.h"
18#include "fd.h"
19#include "SquidConfig.h"
20#include "SquidIpc.h"
21#include "SquidTime.h"
22#include "StatCounters.h"
23#include "Store.h"
24#include "unlinkd.h"
25
26#include <cerrno>
27#if HAVE_SYS_IPC_H
28#include <sys/ipc.h>
29#endif
30#if HAVE_SYS_MSG_H
31#include <sys/msg.h>
32#endif
33#if HAVE_SYS_SHM_H
34#include <sys/shm.h>
35#endif
36
37diskd_stats_t diskd_stats;
38
39size_t DiskdIOStrategy::nextInstanceID (0);
40const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
41
42size_t
43DiskdIOStrategy::newInstance()
44{
45    return ++nextInstanceID;
46}
47
48bool
49DiskdIOStrategy::shedLoad()
50{
51    /*
52     * Fail on open() if there are too many requests queued.
53     */
54
55    if (away > magic1) {
56        debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
57
58        return true;
59    }
60
61    return false;
62}
63
64int
65DiskdIOStrategy::load()
66{
67    /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
68    /* the parse function guarantees magic2 is positivie */
69    return away * 1000 / magic2;
70}
71
72void
73DiskdIOStrategy::openFailed()
74{
75    ++diskd_stats.open_fail_queue_len;
76}
77
78DiskFile::Pointer
79DiskdIOStrategy::newFile(char const *path)
80{
81    if (shedLoad()) {
82        openFailed();
83        return NULL;
84    }
85
86    return new DiskdFile (path, this);
87}
88
89DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
90{}
91
92bool
93DiskdIOStrategy::unlinkdUseful() const
94{
95    return true;
96}
97
98void
99DiskdIOStrategy::unlinkFile(char const *path)
100{
101    if (shedLoad()) {
102        /* Damn, we need to issue a sync unlink here :( */
103        debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
104        unlinkdUnlink(path);
105        return;
106    }
107
108    /* We can attempt a diskd unlink */
109    int x;
110
111    ssize_t shm_offset;
112
113    char *buf;
114
115    buf = (char *)shm.get(&shm_offset);
116
117    xstrncpy(buf, path, SHMBUF_BLKSZ);
118
119    x = send(_MQD_UNLINK,
120             0,
121             (StoreIOState::Pointer )NULL,
122             0,
123             0,
124             shm_offset);
125
126    if (x < 0) {
127        debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerror());
128        ::unlink(buf);      /* XXX EWW! */
129        //        shm.put (shm_offset);
130    }
131
132    ++diskd_stats.unlink.ops;
133}
134
135void
136DiskdIOStrategy::init()
137{
138    int pid;
139    void * hIpc;
140    int rfd;
141    int ikey;
142    const char *args[5];
143    char skey1[32];
144    char skey2[32];
145    char skey3[32];
146    Ip::Address localhost;
147
148    ikey = (getpid() << 10) + (instanceID << 2);
149    ikey &= 0x7fffffff;
150    smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
151
152    if (smsgid < 0) {
153        debugs(50, DBG_CRITICAL, "storeDiskdInit: msgget: " << xstrerror());
154        fatal("msgget failed");
155    }
156
157    rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
158
159    if (rmsgid < 0) {
160        debugs(50, DBG_CRITICAL, "storeDiskdInit: msgget: " << xstrerror());
161        fatal("msgget failed");
162    }
163
164    shm.init(ikey, magic2);
165    snprintf(skey1, 32, "%d", ikey);
166    snprintf(skey2, 32, "%d", ikey + 1);
167    snprintf(skey3, 32, "%d", ikey + 2);
168    args[0] = "diskd";
169    args[1] = skey1;
170    args[2] = skey2;
171    args[3] = skey3;
172    args[4] = NULL;
173    localhost.setLocalhost();
174    pid = ipcCreate(IPC_STREAM,
175                    Config.Program.diskd,
176                    args,
177                    "diskd",
178                    localhost,
179                    &rfd,
180                    &wfd,
181                    &hIpc);
182
183    if (pid < 0)
184        fatalf("execl: %s", Config.Program.diskd);
185
186    if (rfd != wfd)
187        comm_close(rfd);
188
189    fd_note(wfd, "squid -> diskd");
190
191    commUnsetFdTimeout(wfd);
192    commSetNonBlocking(wfd);
193    Comm::QuickPollRequired();
194}
195
196/*
197 * SHM manipulation routines
198 */
199void
200SharedMemory::put(ssize_t offset)
201{
202    int i;
203    assert(offset >= 0);
204    assert(offset < nbufs * SHMBUF_BLKSZ);
205    i = offset / SHMBUF_BLKSZ;
206    assert(i < nbufs);
207    assert(CBIT_TEST(inuse_map, i));
208    CBIT_CLR(inuse_map, i);
209    --diskd_stats.shmbuf_count;
210}
211
212void *
213
214SharedMemory::get(ssize_t * shm_offset)
215{
216    char *aBuf = NULL;
217    int i;
218
219    for (i = 0; i < nbufs; ++i) {
220        if (CBIT_TEST(inuse_map, i))
221            continue;
222
223        CBIT_SET(inuse_map, i);
224
225        *shm_offset = i * SHMBUF_BLKSZ;
226
227        aBuf = buf + (*shm_offset);
228
229        break;
230    }
231
232    assert(aBuf);
233    assert(aBuf >= buf);
234    assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
235    ++diskd_stats.shmbuf_count;
236
237    if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
238        diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
239
240    return aBuf;
241}
242
243void
244SharedMemory::init(int ikey, int magic2)
245{
246    nbufs = (int)(magic2 * 1.3);
247    id = shmget((key_t) (ikey + 2),
248                nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
249
250    if (id < 0) {
251        debugs(50, DBG_CRITICAL, "storeDiskdInit: shmget: " << xstrerror());
252        fatal("shmget failed");
253    }
254
255    buf = (char *)shmat(id, NULL, 0);
256
257    if (buf == (void *) -1) {
258        debugs(50, DBG_CRITICAL, "storeDiskdInit: shmat: " << xstrerror());
259        fatal("shmat failed");
260    }
261
262    inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
263    diskd_stats.shmbuf_count += nbufs;
264
265    for (int i = 0; i < nbufs; ++i) {
266        CBIT_SET(inuse_map, i);
267        put (i * SHMBUF_BLKSZ);
268    }
269}
270
271void
272DiskdIOStrategy::unlinkDone(diomsg * M)
273{
274    debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
275    ++statCounter.syscalls.disk.unlinks;
276
277    if (M->status < 0)
278        ++diskd_stats.unlink.fail;
279    else
280        ++diskd_stats.unlink.success;
281}
282
283void
284DiskdIOStrategy::handle(diomsg * M)
285{
286    if (!cbdataReferenceValid (M->callback_data)) {
287        /* I.e. already closed file
288         * - say when we have a error opening after
289         *   a read was already queued
290         */
291        debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
292        cbdataReferenceDone (M->callback_data);
293        return;
294    }
295
296    /* set errno passed from diskd.  makes debugging more meaningful */
297    if (M->status < 0)
298        errno = -M->status;
299
300    if (M->newstyle) {
301        DiskdFile *theFile = (DiskdFile *)M->callback_data;
302        theFile->unlock();
303        theFile->completed (M);
304    } else
305        switch (M->mtype) {
306
307        case _MQD_OPEN:
308
309        case _MQD_CREATE:
310
311        case _MQD_CLOSE:
312
313        case _MQD_READ:
314
315        case _MQD_WRITE:
316            assert (0);
317            break;
318
319        case _MQD_UNLINK:
320            unlinkDone(M);
321            break;
322
323        default:
324            assert(0);
325            break;
326        }
327
328    cbdataReferenceDone (M->callback_data);
329}
330
331int
332DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
333{
334    diomsg M;
335    M.callback_data = cbdataReference(theFile);
336    theFile->lock();
337    M.requestor = requestor;
338    M.newstyle = true;
339
340    if (requestor)
341        requestor->lock();
342
343    return SEND(&M, mtype, id, size, offset, shm_offset);
344}
345
346int
347DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
348{
349    diomsg M;
350    M.callback_data = cbdataReference(sio.getRaw());
351    M.newstyle = false;
352
353    return SEND(&M, mtype, id, size, offset, shm_offset);
354}
355
356int
357DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
358{
359    static int send_errors = 0;
360    static int last_seq_no = 0;
361    static int seq_no = 0;
362    int x;
363
364    M->mtype = mtype;
365    M->size = size;
366    M->offset = offset;
367    M->status = -1;
368    M->shm_offset = (int) shm_offset;
369    M->id = id;
370    M->seq_no = ++seq_no;
371
372    if (M->seq_no < last_seq_no)
373        debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
374
375    x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
376
377    last_seq_no = M->seq_no;
378
379    if (0 == x) {
380        ++diskd_stats.sent_count;
381        ++away;
382    } else {
383        debugs(79, DBG_IMPORTANT, "storeDiskdSend: msgsnd: " << xstrerror());
384        cbdataReferenceDone(M->callback_data);
385        ++send_errors;
386        assert(send_errors < 100);
387        if (shm_offset > -1)
388            shm.put(shm_offset);
389    }
390
391    /*
392     * We have to drain the queue here if necessary.  If we don't,
393     * then we can have a lot of messages in the queue (probably
394     * up to 2*magic1) and we can run out of shared memory buffers.
395     */
396    /*
397     * Note that we call Store::Root().callbackk (for all SDs), rather
398     * than callback for just this SD, so that while
399     * we're "blocking" on this SD we can also handle callbacks
400     * from other SDs that might be ready.
401     */
402
403    struct timeval delay = {0, 1};
404
405    while (away > magic2) {
406        select(0, NULL, NULL, NULL, &delay);
407        Store::Root().callback();
408
409        if (delay.tv_usec < 1000000)
410            delay.tv_usec <<= 1;
411    }
412
413    return x;
414}
415
416ConfigOption *
417DiskdIOStrategy::getOptionTree() const
418{
419    ConfigOptionVector *result = new ConfigOptionVector;
420    result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
421    result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
422    return result;
423}
424
425bool
426DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
427{
428    if (strcmp(name, "Q1") != 0)
429        return false;
430
431    int old_magic1 = magic1;
432
433    magic1 = atoi(value);
434
435    if (!isaReconfig)
436        return true;
437
438    if (old_magic1 < magic1) {
439        /*
440        * This is because shm.nbufs is computed at startup, when
441        * we call shmget().  We can't increase the Q1/Q2 parameters
442        * beyond their initial values because then we might have
443        * more "Q2 messages" than shared memory chunks, and this
444        * will cause an assertion in storeDiskdShmGet().
445        */
446        /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
447        debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
448        magic1 = old_magic1;
449        return true;
450    }
451
452    if (old_magic1 != magic1)
453        debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
454
455    return true;
456}
457
458void
459DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
460{
461    storeAppendPrintf(e, " Q1=%d", magic1);
462}
463
464bool
465DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
466{
467    if (strcmp(name, "Q2") != 0)
468        return false;
469
470    int old_magic2 = magic2;
471
472    magic2 = atoi(value);
473
474    if (!isaReconfig)
475        return true;
476
477    if (old_magic2 < magic2) {
478        /* See comments in Q1 function above */
479        debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
480        magic2 = old_magic2;
481        return true;
482    }
483
484    if (old_magic2 != magic2)
485        debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
486
487    return true;
488}
489
490void
491DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
492{
493    storeAppendPrintf(e, " Q2=%d", magic2);
494}
495
496/*
497 * Sync any pending data. We just sit around and read the queue
498 * until the data has finished writing.
499 */
500void
501DiskdIOStrategy::sync()
502{
503    static time_t lastmsg = 0;
504
505    while (away > 0) {
506        if (squid_curtime > lastmsg) {
507            debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
508            lastmsg = squid_curtime;
509        }
510
511        callback();
512    }
513}
514
515/*
516 * Handle callbacks. If we have more than magic2 requests away, we block
517 * until the queue is below magic2. Otherwise, we simply return when we
518 * don't get a message.
519 */
520
521int
522DiskdIOStrategy::callback()
523{
524    diomsg M;
525    int x;
526    int retval = 0;
527
528    if (away >= magic2) {
529        ++diskd_stats.block_queue_len;
530        retval = 1;
531        /* We might not have anything to do, but our queue
532         * is full.. */
533    }
534
535    if (diskd_stats.sent_count - diskd_stats.recv_count >
536            diskd_stats.max_away) {
537        diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
538    }
539
540    while (1) {
541#ifdef  ALWAYS_ZERO_BUFFERS
542        memset(&M, '\0', sizeof(M));
543#endif
544
545        x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
546
547        if (x < 0)
548            break;
549        else if (x != diomsg::msg_snd_rcv_sz) {
550            debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
551            break;
552        }
553
554        ++diskd_stats.recv_count;
555        --away;
556        handle(&M);
557        retval = 1;     /* Return that we've actually done some work */
558
559        if (M.shm_offset > -1)
560            shm.put ((off_t) M.shm_offset);
561    }
562
563    return retval;
564}
565
566void
567DiskdIOStrategy::statfs(StoreEntry & sentry)const
568{
569    storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
570}
571
Note: See TracBrowser for help on using the repository browser.