source: filezilla/trunk/fuentes/src/engine/ratelimiter.cpp @ 130

Last change on this file since 130 was 130, checked in by jrpelegrina, 3 years ago

First release to xenial

File size: 6.6 KB
Line 
1#include <filezilla.h>
2#include "ratelimiter.h"
3
4#include "event_loop.h"
5
6static int const tickDelay = 250;
7
8CRateLimiter::CRateLimiter(CEventLoop& loop, COptionsBase& options)
9        : CEventHandler(loop)
10        , options_(options)
11{
12        RegisterOption(OPTION_SPEEDLIMIT_ENABLE);
13        RegisterOption(OPTION_SPEEDLIMIT_INBOUND);
14        RegisterOption(OPTION_SPEEDLIMIT_OUTBOUND);
15
16        m_tokenDebt[0] = 0;
17        m_tokenDebt[1] = 0;
18}
19
20CRateLimiter::~CRateLimiter()
21{
22        RemoveHandler();
23}
24
25int64_t CRateLimiter::GetLimit(rate_direction direction) const
26{
27        int64_t ret{};
28        if (options_.GetOptionVal(OPTION_SPEEDLIMIT_ENABLE) != 0) {
29                ret = static_cast<int64_t>(options_.GetOptionVal(OPTION_SPEEDLIMIT_INBOUND + direction)) * 1024;
30        }
31
32        return ret;
33}
34
35void CRateLimiter::AddObject(CRateLimiterObject* pObject)
36{
37        scoped_lock lock(sync_);
38
39        m_objectList.push_back(pObject);
40
41        for (int i = 0; i < 2; ++i) {
42                int64_t limit = GetLimit(static_cast<rate_direction>(i));
43                if (limit > 0) {
44                        int64_t tokens = limit / (1000 / tickDelay);
45
46                        tokens /= m_objectList.size();
47                        if (m_tokenDebt[i] > 0) {
48                                if (tokens >= m_tokenDebt[i]) {
49                                        tokens -= m_tokenDebt[i];
50                                        m_tokenDebt[i] = 0;
51                                }
52                                else {
53                                        tokens = 0;
54                                        m_tokenDebt[i] -= tokens;
55                                }
56                        }
57
58                        pObject->m_bytesAvailable[i] = tokens;
59
60                        if (!m_timer)
61                                m_timer = AddTimer(duration::from_milliseconds(tickDelay), false);
62                }
63                else {
64                        pObject->m_bytesAvailable[i] = -1;
65                }
66        }
67}
68
69void CRateLimiter::RemoveObject(CRateLimiterObject* pObject)
70{
71        scoped_lock lock(sync_);
72
73        for (auto iter = m_objectList.begin(); iter != m_objectList.end(); ++iter) {
74                if (*iter == pObject) {
75                        for (int i = 0; i < 2; ++i) {
76                                // If an object already used up some of its assigned tokens, add them to m_tokenDebt,
77                                // so that newly created objects get less initial tokens.
78                                // That ensures that rapidly adding and removing objects does not exceed the rate
79                                int64_t limit = GetLimit(static_cast<rate_direction>(i));
80                                int64_t tokens = limit / (1000 / tickDelay);
81                                tokens /= m_objectList.size();
82                                if ((*iter)->m_bytesAvailable[i] < tokens)
83                                        m_tokenDebt[i] += tokens - (*iter)->m_bytesAvailable[i];
84                        }
85                        m_objectList.erase(iter);
86                        break;
87                }
88        }
89
90        for (int i = 0; i < 2; ++i) {
91                for (auto iter = m_wakeupList[i].begin(); iter != m_wakeupList[i].end(); ++iter) {
92                        if (*iter == pObject) {
93                                m_wakeupList[i].erase(iter);
94                                break;
95                        }
96                }
97        }
98}
99
100void CRateLimiter::OnTimer(timer_id)
101{
102        scoped_lock lock(sync_);
103
104        int64_t const limits[2] = { GetLimit(inbound), GetLimit(outbound) };
105
106        for (int i = 0; i < 2; ++i) {
107                m_tokenDebt[i] = 0;
108
109                if (m_objectList.empty())
110                        continue;
111
112                if (limits[i] == 0) {
113                        for (auto iter = m_objectList.begin(); iter != m_objectList.end(); ++iter) {
114                                (*iter)->m_bytesAvailable[i] = -1;
115                                if ((*iter)->m_waiting[i])
116                                        m_wakeupList[i].push_back(*iter);
117                        }
118                        continue;
119                }
120
121                int64_t tokens = (limits[i] * tickDelay) / 1000;
122                int64_t maxTokens = tokens * GetBucketSize();
123
124                // Get amount of tokens for each object
125                int64_t tokensPerObject = tokens / m_objectList.size();
126
127                if (tokensPerObject == 0)
128                        tokensPerObject = 1;
129                tokens = 0;
130
131                // This list will hold all objects which didn't reach maxTokens
132                std::list<CRateLimiterObject*> unsaturatedObjects;
133
134                for (auto iter = m_objectList.begin(); iter != m_objectList.end(); ++iter) {
135                        if ((*iter)->m_bytesAvailable[i] == -1) {
136                                wxASSERT(!(*iter)->m_waiting[i]);
137                                (*iter)->m_bytesAvailable[i] = tokensPerObject;
138                                unsaturatedObjects.push_back(*iter);
139                        }
140                        else {
141                                (*iter)->m_bytesAvailable[i] += tokensPerObject;
142                                if ((*iter)->m_bytesAvailable[i] > maxTokens)
143                                {
144                                        tokens += (*iter)->m_bytesAvailable[i] - maxTokens;
145                                        (*iter)->m_bytesAvailable[i] = maxTokens;
146                                }
147                                else
148                                        unsaturatedObjects.push_back(*iter);
149
150                                if ((*iter)->m_waiting[i])
151                                        m_wakeupList[i].push_back(*iter);
152                        }
153                }
154
155                // If there are any left-over tokens (in case of objects with a rate below the limit)
156                // assign to the unsaturated sources
157                while (tokens != 0 && !unsaturatedObjects.empty()) {
158                        tokensPerObject = tokens / unsaturatedObjects.size();
159                        if (tokensPerObject == 0)
160                                break;
161                        tokens = 0;
162
163                        std::list<CRateLimiterObject*> objects;
164                        objects.swap(unsaturatedObjects);
165
166                        for (auto iter = objects.begin(); iter != objects.end(); ++iter) {
167                                (*iter)->m_bytesAvailable[i] += tokensPerObject;
168                                if ((*iter)->m_bytesAvailable[i] > maxTokens) {
169                                        tokens += (*iter)->m_bytesAvailable[i] - maxTokens;
170                                        (*iter)->m_bytesAvailable[i] = maxTokens;
171                                }
172                                else
173                                        unsaturatedObjects.push_back(*iter);
174                        }
175                }
176        }
177
178        WakeupWaitingObjects(lock);
179
180        if (m_objectList.empty() || (limits[inbound] == 0 && limits[outbound] == 0)) {
181                if (m_timer) {
182                        StopTimer(m_timer);
183                        m_timer = 0;
184                }
185        }
186}
187
188void CRateLimiter::WakeupWaitingObjects(scoped_lock & l)
189{
190        for (int i = 0; i < 2; ++i) {
191                while (!m_wakeupList[i].empty()) {
192                        CRateLimiterObject* pObject = m_wakeupList[i].front();
193                        m_wakeupList[i].pop_front();
194                        if (!pObject->m_waiting[i])
195                                continue;
196
197                        wxASSERT(pObject->m_bytesAvailable[i] != 0);
198                        pObject->m_waiting[i] = false;
199
200                        l.unlock(); // Do not hold while executing callback
201                        pObject->OnRateAvailable((rate_direction)i);
202                        l.lock();
203                }
204        }
205}
206
207int CRateLimiter::GetBucketSize() const
208{
209        const int burst_tolerance = options_.GetOptionVal(OPTION_SPEEDLIMIT_BURSTTOLERANCE);
210
211        int bucket_size = 1000 / tickDelay;
212        switch (burst_tolerance)
213        {
214        case 1:
215                bucket_size *= 2;
216                break;
217        case 2:
218                bucket_size *= 5;
219                break;
220        default:
221                break;
222        }
223
224        return bucket_size;
225}
226
227void CRateLimiter::operator()(CEventBase const& ev)
228{
229        if (Dispatch<CTimerEvent>(ev, this, &CRateLimiter::OnTimer)) {
230                return;
231        }
232        Dispatch<CRateLimitChangedEvent>(ev, this, &CRateLimiter::OnRateChanged);
233}
234
235void CRateLimiter::OnRateChanged()
236{
237        scoped_lock lock(sync_);
238        if (GetLimit(inbound) > 0 || GetLimit(outbound) > 0) {
239                if (!m_timer)
240                        m_timer = AddTimer(duration::from_milliseconds(tickDelay), false);
241        }
242}
243
244void CRateLimiter::OnOptionsChanged(changed_options_t const&)
245{
246        SendEvent<CRateLimitChangedEvent>();
247}
248
249CRateLimiterObject::CRateLimiterObject()
250{
251        for (int i = 0; i < 2; ++i) {
252                m_waiting[i] = false;
253                m_bytesAvailable[i] = -1;
254        }
255}
256
257void CRateLimiterObject::UpdateUsage(CRateLimiter::rate_direction direction, int usedBytes)
258{
259        wxASSERT(usedBytes <= m_bytesAvailable[direction]);
260        if (usedBytes > m_bytesAvailable[direction])
261                m_bytesAvailable[direction] = 0;
262        else
263                m_bytesAvailable[direction] -= usedBytes;
264}
265
266void CRateLimiterObject::Wait(CRateLimiter::rate_direction direction)
267{
268        wxASSERT(m_bytesAvailable[direction] == 0);
269        m_waiting[direction] = true;
270}
271
272bool CRateLimiterObject::IsWaiting(CRateLimiter::rate_direction direction) const
273{
274        return m_waiting[direction];
275}
Note: See TracBrowser for help on using the repository browser.