30 #include <boost/asio/io_service.hpp> 31 #include <boost/lexical_cast.hpp> 32 #include <boost/range/adaptor/map.hpp> 39 constexpr
double SegmentFetcher::MIN_SSTHRESH;
45 NDN_THROW(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
49 NDN_THROW(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
53 NDN_THROW(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
56 if (mdCoef < 0.0 || mdCoef > 1.0) {
57 NDN_THROW(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
61 SegmentFetcher::SegmentFetcher(
Face& face,
66 , m_scheduler(m_face.getIoService())
67 , m_validator(validator)
68 , m_rttEstimator(make_shared<RttEstimator::Options>(options.
rttOptions))
76 shared_ptr<SegmentFetcher>
82 shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
83 fetcher->m_this = fetcher;
84 fetcher->fetchFirstSegment(baseInterest,
false);
95 m_pendingSegments.clear();
96 m_scheduler.schedule(0_s, [
self =
std::move(m_this)] {});
100 SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
102 auto self = weakSelf.lock();
103 return self ==
nullptr ||
self->m_this ==
nullptr;
107 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest,
bool isRetransmission)
113 if (isRetransmission) {
117 sendInterest(0, interest, isRetransmission);
121 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
123 if (checkAllSegmentsReceived()) {
125 return finalizeFetch();
128 int64_t availableWindowSize;
129 if (m_options.inOrder) {
130 availableWindowSize = std::min<int64_t>(m_cwnd, m_options.flowControlWindow - m_segmentBuffer.size());
133 availableWindowSize =
static_cast<int64_t
>(m_cwnd);
135 availableWindowSize -= m_nSegmentsInFlight;
137 std::vector<std::pair<uint64_t, bool>> segmentsToRequest;
139 while (availableWindowSize > 0) {
140 if (!m_retxQueue.empty()) {
141 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
143 if (pendingSegmentIt == m_pendingSegments.end()) {
147 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
148 segmentsToRequest.emplace_back(pendingSegmentIt->first,
true);
150 else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
151 if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
156 segmentsToRequest.emplace_back(m_nextSegmentNum++,
false);
161 availableWindowSize--;
164 for (
const auto& segment : segmentsToRequest) {
166 interest.
setName(
Name(m_versionedDataName).appendSegment(segment.first));
171 sendInterest(segment.first, interest, segment.second);
176 SegmentFetcher::sendInterest(uint64_t segNum,
const Interest& interest,
bool isRetransmission)
178 weak_ptr<SegmentFetcher> weakSelf = m_this;
180 ++m_nSegmentsInFlight;
181 auto pendingInterest = m_face.expressInterest(interest,
182 [
this, weakSelf] (
const Interest& interest,
const Data& data) {
183 afterSegmentReceivedCb(interest, data, weakSelf);
186 afterNackReceivedCb(interest, nack, weakSelf);
190 auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
191 auto timeoutEvent = m_scheduler.schedule(
timeout, [
this, interest, weakSelf] {
192 afterTimeoutCb(interest, weakSelf);
195 if (isRetransmission) {
196 updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
201 pendingInterest, timeoutEvent};
202 bool isNew = m_pendingSegments.emplace(segNum,
std::move(pendingSegment)).second;
204 m_highInterest = segNum;
208 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest,
const Data& data,
209 const weak_ptr<SegmentFetcher>& weakSelf)
211 if (shouldStop(weakSelf))
214 BOOST_ASSERT(m_nSegmentsInFlight > 0);
215 m_nSegmentsInFlight--;
218 if (!currentSegmentComponent.
isSegment()) {
222 uint64_t currentSegment = currentSegmentComponent.
toSegment();
225 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
226 if (m_receivedSegments.size() > 0) {
227 pendingSegmentIt = m_pendingSegments.find(currentSegment);
230 pendingSegmentIt = m_pendingSegments.begin();
233 if (pendingSegmentIt == m_pendingSegments.end()) {
237 pendingSegmentIt->second.timeoutEvent.cancel();
241 m_validator.validate(data,
242 [=] (
const Data& d) { afterValidationSuccess(d, origInterest, pendingSegmentIt, weakSelf); },
243 [=] (
const Data& d,
const auto& error) { afterValidationFailure(d, error, weakSelf); });
247 SegmentFetcher::afterValidationSuccess(
const Data& data,
const Interest& origInterest,
248 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
249 const weak_ptr<SegmentFetcher>& weakSelf)
251 if (shouldStop(weakSelf))
262 m_receivedSegments.insert(currentSegment);
265 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
266 BOOST_ASSERT(m_nSegmentsInFlight >= 0);
267 m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
268 static_cast<size_t>(m_nSegmentsInFlight) + 1);
272 m_pendingSegments.erase(pendingSegmentIt);
275 auto receivedSegmentIt = m_segmentBuffer.emplace(std::piecewise_construct,
276 std::forward_as_tuple(currentSegment),
279 receivedSegmentIt.first->second.begin());
286 "Received FinalBlockId did not contain a segment component");
289 if (data.
getFinalBlock()->toSegment() + 1 !=
static_cast<uint64_t
>(m_nSegments)) {
291 cancelExcessInFlightSegments();
295 if (m_options.inOrder && m_nextSegmentInOrder == currentSegment) {
297 onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
298 m_segmentBuffer.erase(m_nextSegmentInOrder++);
299 }
while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
302 if (m_receivedSegments.size() == 1) {
304 if (currentSegment == 0) {
310 if (m_highData < currentSegment) {
311 m_highData = currentSegment;
321 fetchSegmentsInWindow(origInterest);
325 SegmentFetcher::afterValidationFailure(
const Data&,
326 const security::ValidationError& error,
327 const weak_ptr<SegmentFetcher>& weakSelf)
329 if (shouldStop(weakSelf))
336 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest,
const lp::Nack& nack,
337 const weak_ptr<SegmentFetcher>& weakSelf)
339 if (shouldStop(weakSelf))
344 BOOST_ASSERT(m_nSegmentsInFlight > 0);
345 m_nSegmentsInFlight--;
350 afterNackOrTimeout(origInterest);
359 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
360 const weak_ptr<SegmentFetcher>& weakSelf)
362 if (shouldStop(weakSelf))
367 BOOST_ASSERT(m_nSegmentsInFlight > 0);
368 m_nSegmentsInFlight--;
369 afterNackOrTimeout(origInterest);
373 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
381 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
382 BOOST_ASSERT(m_pendingSegments.size() > 0);
384 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.
toSegment()) > 0);
385 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.
toSegment());
388 BOOST_ASSERT(m_pendingSegments.size() > 0);
389 pendingSegmentIt = m_pendingSegments.begin();
393 pendingSegmentIt->second.timeoutEvent.cancel();
394 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
396 m_rttEstimator.backoffRto();
398 if (m_receivedSegments.size() == 0) {
400 fetchFirstSegment(origInterest,
true);
404 m_retxQueue.push(pendingSegmentIt->first);
405 fetchSegmentsInWindow(origInterest);
410 SegmentFetcher::finalizeFetch()
412 if (m_options.inOrder) {
419 BOOST_ASSERT(m_receivedSegments.size() >=
static_cast<uint64_t
>(m_nSegments));
421 for (int64_t i = 0; i < m_nSegments; i++) {
422 buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].
size());
430 SegmentFetcher::windowIncrease()
432 if (m_options.useConstantCwnd) {
433 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
437 if (m_cwnd < m_ssthresh) {
438 m_cwnd += m_options.aiStep;
441 m_cwnd += m_options.aiStep / std::floor(m_cwnd);
446 SegmentFetcher::windowDecrease()
448 if (m_options.disableCwa || m_highData > m_recPoint) {
449 m_recPoint = m_highInterest;
451 if (m_options.useConstantCwnd) {
452 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
457 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef);
458 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
463 SegmentFetcher::signalError(uint32_t code,
const std::string& msg)
470 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
474 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
475 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
476 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
477 pendingSegmentIt->second.state = SegmentState::Retransmitted;
478 pendingSegmentIt->second.hdl = pendingInterest;
479 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
483 SegmentFetcher::cancelExcessInFlightSegments()
485 for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
486 if (it->first >= static_cast<uint64_t>(m_nSegments)) {
487 it = m_pendingSegments.erase(it);
488 BOOST_ASSERT(m_nSegmentsInFlight > 0);
489 m_nSegmentsInFlight--;
498 SegmentFetcher::checkAllSegmentsReceived()
500 bool haveReceivedAllSegments =
false;
502 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
503 haveReceivedAllSegments =
true;
505 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
506 if (m_receivedSegments.count(i) == 0) {
508 haveReceivedAllSegments =
false;
513 return haveReceivedAllSegments;
517 SegmentFetcher::getEstimatedRto()
521 return std::min(m_options.maxTimeout,
522 time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
Copyright (c) 2011-2015 Regents of the University of California.
Interest & setMustBeFresh(bool mustBeFresh)
Add or remove MustBeFresh element.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::Validator &validator, const Options &options=Options())
Initiates segment fetching.
const Component & get(ssize_t i) const
Returns an immutable reference to the component at the specified index.
static time_point now() noexcept
An unrecoverable Nack was received during retrieval.
void refreshNonce()
Change nonce value.
const_iterator value_begin() const noexcept
Get begin iterator of TLV-VALUE.
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emitted upon successful retrieval of the complete object (all segments).
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
Utility class to fetch the latest version of a segmented object.
size_t value_size() const noexcept
Return the size of TLV-VALUE, i.e., the TLV-LENGTH.
Represents an Interest packet.
Signal< SegmentFetcher > onInOrderComplete
Emitted on successful retrieval of all segments in 'in order' mode.
void stop()
Stops fetching.
A handle for a scheduled event.
represents a Network Nack
NackReason getReason() const
double aiStep
additive increase step (in segments)
uint64_t getCongestionMark() const
get the value of the CongestionMark tag
One of the retrieved segments failed user-provided validation.
Handle for a pending Interest.
bool isSegment() const
Check if the component is a segment number per NDN naming conventions.
Signal< SegmentFetcher > afterSegmentNacked
Emitted whenever an Interest for a data segment is nacked.
Provide a communication channel with local or remote NDN forwarder.
Signal< SegmentFetcher, Data > afterSegmentValidated
Emitted whenever a received data segment has been successfully validated.
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
One of the retrieved Data packets lacked a segment number in the last Name component (excl...
const Name & getName() const noexcept
Get name.
double initSsthresh
initial slow start threshold
double initCwnd
initial congestion window size
Represents a name component.
shared_ptr< Buffer > buf()
Flush written data to the stream and return shared pointer to the underlying buffer.
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emitted whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emitted whenever an Interest for a data segment times out.
RttEstimator::Options rttOptions
options for RTT estimator
const Block & getContent() const noexcept
Get the Content element.
const Name & getName() const noexcept
Interest & setInterestLifetime(time::milliseconds lifetime)
Set the Interest's lifetime.
implements an output stream that constructs ndn::Buffer
span_constexpr std::size_t size(span< T, Extent > const &spn)
const_iterator value_end() const noexcept
Get end iterator of TLV-VALUE.
Signal< SegmentFetcher, uint32_t, std::string > onError
Emitted when the retrieval could not be completed due to an error.
Represents a Data packet.
A received FinalBlockId did not contain a segment component.
const optional< name::Component > & getFinalBlock() const
Signal< SegmentFetcher, ConstBufferPtr > onInOrderData
Emitted after each data segment in segment order has been validated.
Interest & setCanBePrefix(bool canBePrefix)
Add or remove CanBePrefix element.
boost::chrono::milliseconds milliseconds
Interest & setName(const Name &name)
Set the Interest's name.