34 #include <boost/asio/io_service.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/range/adaptor/map.hpp>
43 constexpr
double SegmentFetcher::MIN_SSTHRESH;
49 NDN_THROW(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
53 NDN_THROW(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
57 NDN_THROW(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
60 if (mdCoef < 0.0 || mdCoef > 1.0) {
61 NDN_THROW(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
65 SegmentFetcher::SegmentFetcher(
Face& face,
70 , m_scheduler(m_face.getIoService())
71 , m_validator(validator)
73 , m_timeLastSegmentReceived(
time::steady_clock::now())
75 , m_cwnd(options.initCwnd)
76 , m_ssthresh(options.initSsthresh)
77 , m_nSegmentsInFlight(0)
88 shared_ptr<SegmentFetcher>
94 shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
95 fetcher->m_this = fetcher;
96 fetcher->fetchFirstSegment(baseInterest,
false);
107 m_pendingSegments.clear();
112 SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
114 auto self = weakSelf.lock();
115 return self ==
nullptr ||
self->m_this ==
nullptr;
119 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest,
bool isRetransmission)
122 interest.setCanBePrefix(
true);
123 interest.setMustBeFresh(
true);
125 if (isRetransmission) {
126 interest.refreshNonce();
129 sendInterest(0, interest, isRetransmission);
133 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
135 if (checkAllSegmentsReceived()) {
137 return finalizeFetch();
140 int64_t availableWindowSize =
static_cast<int64_t
>(m_cwnd) - m_nSegmentsInFlight;
141 std::vector<std::pair<uint64_t, bool>> segmentsToRequest;
143 while (availableWindowSize > 0) {
144 if (!m_retxQueue.empty()) {
145 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
147 if (pendingSegmentIt == m_pendingSegments.end()) {
151 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
152 segmentsToRequest.emplace_back(pendingSegmentIt->first,
true);
154 else if (m_nSegments == 0 || m_nextSegmentNum <
static_cast<uint64_t
>(m_nSegments)) {
155 if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
160 segmentsToRequest.emplace_back(m_nextSegmentNum++,
false);
165 availableWindowSize--;
168 for (
const auto& segment : segmentsToRequest) {
170 interest.setName(
Name(m_versionedDataName).appendSegment(segment.first));
171 interest.setCanBePrefix(
false);
172 interest.setMustBeFresh(
false);
174 interest.refreshNonce();
175 sendInterest(segment.first, interest, segment.second);
180 SegmentFetcher::sendInterest(uint64_t segNum,
const Interest& interest,
bool isRetransmission)
182 weak_ptr<SegmentFetcher> weakSelf = m_this;
184 ++m_nSegmentsInFlight;
186 [
this, weakSelf] (
const Interest& interest,
const Data& data) {
187 afterSegmentReceivedCb(interest, data, weakSelf);
190 afterNackReceivedCb(interest, nack, weakSelf);
195 auto timeoutEvent = m_scheduler.
schedule(timeout, [
this, interest, weakSelf] {
196 afterTimeoutCb(interest, weakSelf);
199 if (isRetransmission) {
200 updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
205 pendingInterest, timeoutEvent};
206 bool isNew = m_pendingSegments.emplace(segNum,
std::move(pendingSegment)).second;
208 m_highInterest = segNum;
212 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest,
const Data& data,
213 const weak_ptr<SegmentFetcher>& weakSelf)
215 if (shouldStop(weakSelf))
218 BOOST_ASSERT(m_nSegmentsInFlight > 0);
219 m_nSegmentsInFlight--;
222 if (!currentSegmentComponent.
isSegment()) {
226 uint64_t currentSegment = currentSegmentComponent.
toSegment();
229 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
230 if (m_receivedSegments.size() > 0) {
231 pendingSegmentIt = m_pendingSegments.find(currentSegment);
234 pendingSegmentIt = m_pendingSegments.begin();
237 if (pendingSegmentIt == m_pendingSegments.end()) {
241 pendingSegmentIt->second.timeoutEvent.cancel();
246 bind(&SegmentFetcher::afterValidationSuccess,
this, _1, origInterest,
247 pendingSegmentIt, weakSelf),
248 bind(&SegmentFetcher::afterValidationFailure,
this, _1, _2, weakSelf));
252 SegmentFetcher::afterValidationSuccess(
const Data& data,
const Interest& origInterest,
253 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
254 const weak_ptr<SegmentFetcher>& weakSelf)
256 if (shouldStop(weakSelf))
266 uint64_t currentSegment = data.getName().get(-1).toSegment();
268 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
269 BOOST_ASSERT(m_nSegmentsInFlight >= 0);
270 m_rttEstimator.
addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
271 static_cast<size_t>(m_nSegmentsInFlight) + 1);
275 m_pendingSegments.erase(pendingSegmentIt);
278 auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
279 std::forward_as_tuple(currentSegment),
280 std::forward_as_tuple(data.getContent().value_size()));
281 std::copy(data.getContent().value_begin(), data.getContent().value_end(),
282 receivedSegmentIt.first->second.begin());
283 m_nBytesReceived += data.getContent().value_size();
286 if (data.getFinalBlock()) {
287 if (!data.getFinalBlock()->isSegment()) {
289 "Received FinalBlockId did not contain a segment component");
292 if (data.getFinalBlock()->toSegment() + 1 !=
static_cast<uint64_t
>(m_nSegments)) {
293 m_nSegments = data.getFinalBlock()->toSegment() + 1;
294 cancelExcessInFlightSegments();
298 if (m_receivedSegments.size() == 1) {
299 m_versionedDataName = data.getName().
getPrefix(-1);
300 if (currentSegment == 0) {
306 if (m_highData < currentSegment) {
307 m_highData = currentSegment;
317 fetchSegmentsInWindow(origInterest);
321 SegmentFetcher::afterValidationFailure(
const Data& data,
322 const security::v2::ValidationError& error,
323 const weak_ptr<SegmentFetcher>& weakSelf)
325 if (shouldStop(weakSelf))
332 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest,
const lp::Nack& nack,
333 const weak_ptr<SegmentFetcher>& weakSelf)
335 if (shouldStop(weakSelf))
340 BOOST_ASSERT(m_nSegmentsInFlight > 0);
341 m_nSegmentsInFlight--;
346 afterNackOrTimeout(origInterest);
355 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
356 const weak_ptr<SegmentFetcher>& weakSelf)
358 if (shouldStop(weakSelf))
363 BOOST_ASSERT(m_nSegmentsInFlight > 0);
364 m_nSegmentsInFlight--;
365 afterNackOrTimeout(origInterest);
369 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
377 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
378 BOOST_ASSERT(m_pendingSegments.size() > 0);
380 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.
toSegment()) > 0);
381 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.
toSegment());
384 BOOST_ASSERT(m_pendingSegments.size() > 0);
385 pendingSegmentIt = m_pendingSegments.begin();
389 pendingSegmentIt->second.timeoutEvent.cancel();
390 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
394 if (m_receivedSegments.size() == 0) {
396 fetchFirstSegment(origInterest,
true);
400 m_retxQueue.push(pendingSegmentIt->first);
401 fetchSegmentsInWindow(origInterest);
406 SegmentFetcher::finalizeFetch()
411 BOOST_ASSERT(m_receivedSegments.size() >=
static_cast<uint64_t
>(m_nSegments));
413 for (int64_t i = 0; i < m_nSegments; i++) {
414 buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
422 SegmentFetcher::windowIncrease()
425 BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
429 if (m_cwnd < m_ssthresh) {
430 m_cwnd += m_options.
aiStep;
433 m_cwnd += m_options.
aiStep / std::floor(m_cwnd);
438 SegmentFetcher::windowDecrease()
440 if (m_options.
disableCwa || m_highData > m_recPoint) {
441 m_recPoint = m_highInterest;
444 BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
449 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.
mdCoef);
455 SegmentFetcher::signalError(uint32_t code,
const std::string& msg)
462 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
463 const PendingInterestHandle& pendingInterest,
466 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
467 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
468 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
469 pendingSegmentIt->second.state = SegmentState::Retransmitted;
470 pendingSegmentIt->second.hdl = pendingInterest;
471 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
475 SegmentFetcher::cancelExcessInFlightSegments()
477 for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
478 if (it->first >=
static_cast<uint64_t
>(m_nSegments)) {
479 it = m_pendingSegments.erase(it);
480 BOOST_ASSERT(m_nSegmentsInFlight > 0);
481 m_nSegmentsInFlight--;
490 SegmentFetcher::checkAllSegmentsReceived()
492 bool haveReceivedAllSegments =
false;
494 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
495 haveReceivedAllSegments =
true;
497 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
498 if (m_receivedSegments.count(i) == 0) {
500 haveReceivedAllSegments =
false;
505 return haveReceivedAllSegments;
509 SegmentFetcher::getEstimatedRto()
514 time::duration_cast<time::milliseconds>(m_rttEstimator.
getEstimatedRto()));