34 #include <boost/asio/io_service.hpp> 35 #include <boost/lexical_cast.hpp> 36 #include <boost/range/adaptor/map.hpp> 43 constexpr
double SegmentFetcher::MIN_SSTHRESH;
49 BOOST_THROW_EXCEPTION(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
53 BOOST_THROW_EXCEPTION(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
57 BOOST_THROW_EXCEPTION(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
60 if (mdCoef < 0.0 || mdCoef > 1.0) {
61 BOOST_THROW_EXCEPTION(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
65 SegmentFetcher::SegmentFetcher(
Face& face,
70 , m_scheduler(m_face.getIoService())
71 , m_validator(validator)
72 , m_rttEstimator(options.rttOptions)
73 , m_timeLastSegmentReceived(
time::steady_clock::now())
75 , m_cwnd(options.initCwnd)
76 , m_ssthresh(options.initSsthresh)
77 , m_nSegmentsInFlight(0)
88 shared_ptr<SegmentFetcher>
94 shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
95 fetcher->m_this = fetcher;
96 fetcher->fetchFirstSegment(baseInterest,
false);
107 for (
const auto& pendingSegment : m_pendingSegments | boost::adaptors::map_values) {
109 if (pendingSegment.timeoutEvent) {
110 m_scheduler.
cancelEvent(pendingSegment.timeoutEvent);
113 m_scheduler.
scheduleEvent(0_s, [
self = std::move(m_this)] {});
117 SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
119 auto self = weakSelf.lock();
120 return self ==
nullptr ||
self->m_this ==
nullptr;
124 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest,
bool isRetransmission)
127 interest.setCanBePrefix(
true);
128 interest.setMustBeFresh(
true);
130 if (isRetransmission) {
131 interest.refreshNonce();
134 weak_ptr<SegmentFetcher> weakSelf = m_this;
136 m_nSegmentsInFlight++;
138 bind(&SegmentFetcher::afterSegmentReceivedCb,
139 this, _1, _2, weakSelf),
140 bind(&SegmentFetcher::afterNackReceivedCb,
141 this, _1, _2, weakSelf),
145 bind(&SegmentFetcher::afterTimeoutCb,
this, interest, weakSelf));
147 if (isRetransmission) {
148 updateRetransmittedSegment(0, pendingInterest, timeoutEvent);
151 BOOST_ASSERT(m_pendingSegments.count(0) == 0);
153 pendingInterest, timeoutEvent});
158 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
160 weak_ptr<SegmentFetcher> weakSelf = m_this;
162 if (checkAllSegmentsReceived()) {
164 return finalizeFetch();
167 int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
168 std::vector<std::pair<uint64_t, bool>> segmentsToRequest;
170 while (availableWindowSize > 0) {
171 if (!m_retxQueue.empty()) {
172 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
174 if (pendingSegmentIt == m_pendingSegments.end()) {
178 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
179 segmentsToRequest.emplace_back(pendingSegmentIt->first,
true);
181 else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
182 if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
187 segmentsToRequest.emplace_back(m_nextSegmentNum++,
false);
192 availableWindowSize--;
195 for (
const auto& segment : segmentsToRequest) {
197 interest.setName(
Name(m_versionedDataName).appendSegment(segment.first));
198 interest.setCanBePrefix(
false);
199 interest.setMustBeFresh(
false);
201 interest.refreshNonce();
203 m_nSegmentsInFlight++;
205 bind(&SegmentFetcher::afterSegmentReceivedCb,
206 this, _1, _2, weakSelf),
207 bind(&SegmentFetcher::afterNackReceivedCb,
208 this, _1, _2, weakSelf),
212 bind(&SegmentFetcher::afterTimeoutCb,
this, interest, weakSelf));
214 if (segment.second) {
215 updateRetransmittedSegment(segment.first, pendingInterest, timeoutEvent);
218 BOOST_ASSERT(m_pendingSegments.count(segment.first) == 0);
219 m_pendingSegments.emplace(segment.first, PendingSegment{SegmentState::FirstInterest,
220 time::steady_clock::now(),
221 pendingInterest, timeoutEvent});
222 m_highInterest = segment.first;
228 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest,
const Data& data,
229 const weak_ptr<SegmentFetcher>& weakSelf)
231 if (shouldStop(weakSelf))
234 BOOST_ASSERT(m_nSegmentsInFlight > 0);
235 m_nSegmentsInFlight--;
238 if (!currentSegmentComponent.
isSegment()) {
242 uint64_t currentSegment = currentSegmentComponent.
toSegment();
246 if (m_receivedSegments.size() > 0) {
247 pendingSegmentIt = m_pendingSegments.find(currentSegment);
250 pendingSegmentIt = m_pendingSegments.begin();
253 if (pendingSegmentIt == m_pendingSegments.end()) {
260 pendingSegmentIt->second.timeoutEvent.cancel();
263 bind(&SegmentFetcher::afterValidationSuccess,
this, _1, origInterest,
264 pendingSegmentIt, weakSelf),
265 bind(&SegmentFetcher::afterValidationFailure,
this, _1, _2, weakSelf));
269 SegmentFetcher::afterValidationSuccess(
const Data& data,
const Interest& origInterest,
271 const weak_ptr<SegmentFetcher>& weakSelf)
273 if (shouldStop(weakSelf))
283 uint64_t currentSegment = data.getName().get(-1).toSegment();
285 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
286 m_rttEstimator.
addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
287 std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
291 m_pendingSegments.erase(pendingSegmentIt);
294 auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
295 std::forward_as_tuple(currentSegment),
296 std::forward_as_tuple(data.getContent().value_size()));
297 std::copy(data.getContent().value_begin(), data.getContent().value_end(),
298 receivedSegmentIt.first->second.begin());
299 m_nBytesReceived += data.getContent().value_size();
302 if (data.getFinalBlock()) {
303 if (!data.getFinalBlock()->isSegment()) {
305 "Received FinalBlockId did not contain a segment component");
308 if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
309 m_nSegments = data.getFinalBlock()->toSegment() + 1;
310 cancelExcessInFlightSegments();
314 if (m_receivedSegments.size() == 1) {
315 m_versionedDataName = data.getName().
getPrefix(-1);
316 if (currentSegment == 0) {
322 if (m_highData < currentSegment) {
323 m_highData = currentSegment;
333 fetchSegmentsInWindow(origInterest);
337 SegmentFetcher::afterValidationFailure(
const Data& data,
338 const security::v2::ValidationError& error,
339 const weak_ptr<SegmentFetcher>& weakSelf)
341 if (shouldStop(weakSelf))
348 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest,
const lp::Nack& nack,
349 const weak_ptr<SegmentFetcher>& weakSelf)
351 if (shouldStop(weakSelf))
356 BOOST_ASSERT(m_nSegmentsInFlight > 0);
357 m_nSegmentsInFlight--;
362 afterNackOrTimeout(origInterest);
371 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
372 const weak_ptr<SegmentFetcher>& weakSelf)
374 if (shouldStop(weakSelf))
379 BOOST_ASSERT(m_nSegmentsInFlight > 0);
380 m_nSegmentsInFlight--;
381 afterNackOrTimeout(origInterest);
385 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
394 BOOST_ASSERT(m_pendingSegments.size() > 0);
396 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.
toSegment()) > 0);
397 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.
toSegment());
400 BOOST_ASSERT(m_pendingSegments.size() > 0);
401 pendingSegmentIt = m_pendingSegments.begin();
405 pendingSegmentIt->second.timeoutEvent.cancel();
406 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
410 if (m_receivedSegments.size() == 0) {
412 fetchFirstSegment(origInterest,
true);
416 m_retxQueue.push(pendingSegmentIt->first);
417 fetchSegmentsInWindow(origInterest);
422 SegmentFetcher::finalizeFetch()
427 BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
429 for (int64_t i = 0; i < m_nSegments; i++) {
430 buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
438 SegmentFetcher::windowIncrease()
441 BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
445 if (m_cwnd < m_ssthresh) {
446 m_cwnd += m_options.
aiStep;
449 m_cwnd += m_options.
aiStep / std::floor(m_cwnd);
454 SegmentFetcher::windowDecrease()
456 if (m_options.
disableCwa || m_highData > m_recPoint) {
457 m_recPoint = m_highInterest;
460 BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
465 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.
mdCoef);
471 SegmentFetcher::signalError(uint32_t code,
const std::string& msg)
478 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
479 const PendingInterestId* pendingInterest,
480 scheduler::EventId timeoutEvent)
482 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
483 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
484 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
485 pendingSegmentIt->second.state = SegmentState::Retransmitted;
487 pendingSegmentIt->second.id = pendingInterest;
488 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
492 SegmentFetcher::cancelExcessInFlightSegments()
494 for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
495 if (it->first >= static_cast<uint64_t>(m_nSegments)) {
497 if (it->second.timeoutEvent) {
500 it = m_pendingSegments.erase(it);
501 BOOST_ASSERT(m_nSegmentsInFlight > 0);
502 m_nSegmentsInFlight--;
511 SegmentFetcher::checkAllSegmentsReceived()
513 bool haveReceivedAllSegments =
false;
515 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
516 haveReceivedAllSegments =
true;
518 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
519 if (m_receivedSegments.count(i) == 0) {
521 haveReceivedAllSegments =
false;
526 return haveReceivedAllSegments;
530 SegmentFetcher::getEstimatedRto()
535 time::duration_cast<time::milliseconds>(m_rttEstimator.
getEstimatedRto()));
PartialName getPrefix(ssize_t nComponents) const
Extract a prefix of the name.
Copyright (c) 2011-2015 Regents of the University of California.
time::milliseconds interestLifetime
lifetime of sent Interests - independent of Interest timeout
static time_point now() noexcept
An unrecoverable Nack was received during retrieval.
double mdCoef
multiplicative decrease coefficient
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emits upon successful retrieval of the complete data.
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
bool useConstantInterestTimeout
if true, Interest timeout is kept at maxTimeout
Utility class to fetch the latest version of a segmented object.
Represents an Interest packet.
void stop()
Stops fetching.
EventId scheduleEvent(time::nanoseconds after, const EventCallback &callback)
Schedule a one-time event after the specified delay.
void backoffRto()
Backoff RTO by a factor of Options::rtoBackoffMultiplier.
represents a Network Nack
NackReason getReason() const
Table::const_iterator iterator
double aiStep
additive increase step (in segments)
bool disableCwa
disable Conservative Window Adaptation
void addMeasurement(MillisecondsDouble rtt, size_t nExpectedSamples)
Add a new RTT measurement to the estimator.
const Block & get(uint32_t type) const
Get the first sub element of specified TLV-TYPE.
One of the retrieved segments failed user-provided validation.
bool isSegment() const
Check if the component is segment number per NDN naming conventions.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher > afterSegmentNacked
Emits whenever an Interest for a data segment is nacked.
void validate(const Data &data, const DataValidationSuccessCallback &successCb, const DataValidationFailureCallback &failureCb)
Asynchronously validate data.
Provide a communication channel with local or remote NDN forwarder.
Signal< SegmentFetcher, Data > afterSegmentValidated
Emits whenever a received data segment has been successfully validated.
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
One of the retrieved Data packets lacked a segment number in the last Name component (excl....
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
double initCwnd
initial congestion window size
bool ignoreCongMarks
disable window decrease after congestion mark received
Represents a name component.
bool resetCwndToInit
reduce cwnd to initCwnd when loss event occurs
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emits whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emits whenever an Interest for a data segment times out.
void cancelEvent(const EventId &eid)
Cancel a scheduled event.
Signal< SegmentFetcher, uint32_t, std::string > onError
Emits when the retrieval could not be completed due to an error.
A received FinalBlockId did not contain a segment component.
MillisecondsDouble getEstimatedRto() const
Returns the estimated RTO value.
bool useConstantCwnd
if true, window size is kept at initCwnd
void removePendingInterest(const PendingInterestId *pendingInterestId)
Cancel previously expressed Interest.
Interface for validating data and interest packets.