NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.5: NDN, CCN, CCNx, content centric networks
API Documentation
segment-fetcher.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2013-2021 Regents of the University of California,
4  * Colorado State University,
5  * University Pierre & Marie Curie, Sorbonne University.
6  *
7  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
8  *
9  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
10  * terms of the GNU Lesser General Public License as published by the Free Software
11  * Foundation, either version 3 of the License, or (at your option) any later version.
12  *
13  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
14  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
15  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
16  *
17  * You should have received copies of the GNU General Public License and GNU Lesser
18  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
19  * <http://www.gnu.org/licenses/>.
20  *
21  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22  */
23 
27 #include "ndn-cxx/lp/nack.hpp"
29 
30 #include <boost/asio/io_service.hpp>
31 #include <boost/lexical_cast.hpp>
32 #include <boost/range/adaptor/map.hpp>
33 
34 #include <cmath>
35 
36 namespace ndn {
37 namespace util {
38 
39 constexpr double SegmentFetcher::MIN_SSTHRESH;
40 
41 void
43 {
44  if (maxTimeout < 1_ms) {
45  NDN_THROW(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
46  }
47 
48  if (initCwnd < 1.0) {
49  NDN_THROW(std::invalid_argument("initCwnd must be greater than or equal to 1"));
50  }
51 
52  if (aiStep < 0.0) {
53  NDN_THROW(std::invalid_argument("aiStep must be greater than or equal to 0"));
54  }
55 
56  if (mdCoef < 0.0 || mdCoef > 1.0) {
57  NDN_THROW(std::invalid_argument("mdCoef must be in range [0, 1]"));
58  }
59 }
60 
61 SegmentFetcher::SegmentFetcher(Face& face,
62  security::Validator& validator,
63  const SegmentFetcher::Options& options)
64  : m_options(options)
65  , m_face(face)
66  , m_scheduler(m_face.getIoService())
67  , m_validator(validator)
68  , m_rttEstimator(make_shared<RttEstimator::Options>(options.rttOptions))
69  , m_timeLastSegmentReceived(time::steady_clock::now())
70  , m_cwnd(options.initCwnd)
71  , m_ssthresh(options.initSsthresh)
72 {
73  m_options.validate();
74 }
75 
76 shared_ptr<SegmentFetcher>
78  const Interest& baseInterest,
79  security::Validator& validator,
80  const SegmentFetcher::Options& options)
81 {
82  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
83  fetcher->m_this = fetcher;
84  fetcher->fetchFirstSegment(baseInterest, false);
85  return fetcher;
86 }
87 
88 void
90 {
91  if (!m_this) {
92  return;
93  }
94 
95  m_pendingSegments.clear(); // cancels pending Interests and timeout events
96  m_scheduler.schedule(0_s, [self = std::move(m_this)] {});
97 }
98 
99 bool
100 SegmentFetcher::shouldStop(const weak_ptr<SegmentFetcher>& weakSelf)
101 {
102  auto self = weakSelf.lock();
103  return self == nullptr || self->m_this == nullptr;
104 }
105 
106 void
107 SegmentFetcher::fetchFirstSegment(const Interest& baseInterest, bool isRetransmission)
108 {
109  Interest interest(baseInterest);
110  interest.setCanBePrefix(true);
111  interest.setMustBeFresh(true);
112  interest.setInterestLifetime(m_options.interestLifetime);
113  if (isRetransmission) {
114  interest.refreshNonce();
115  }
116 
117  sendInterest(0, interest, isRetransmission);
118 }
119 
120 void
121 SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest)
122 {
123  if (checkAllSegmentsReceived()) {
124  // All segments have been retrieved
125  return finalizeFetch();
126  }
127 
128  int64_t availableWindowSize;
129  if (m_options.inOrder) {
130  availableWindowSize = std::min<int64_t>(m_cwnd, m_options.flowControlWindow - m_segmentBuffer.size());
131  }
132  else {
133  availableWindowSize = static_cast<int64_t>(m_cwnd);
134  }
135  availableWindowSize -= m_nSegmentsInFlight;
136 
137  std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
138 
139  while (availableWindowSize > 0) {
140  if (!m_retxQueue.empty()) {
141  auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
142  m_retxQueue.pop();
143  if (pendingSegmentIt == m_pendingSegments.end()) {
144  // Skip re-requesting this segment, since it was received after RTO timeout
145  continue;
146  }
147  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
148  segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
149  }
150  else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
151  if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
152  // Don't request a segment a second time if received in response to first "discovery" Interest
153  m_nextSegmentNum++;
154  continue;
155  }
156  segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
157  }
158  else {
159  break;
160  }
161  availableWindowSize--;
162  }
163 
164  for (const auto& segment : segmentsToRequest) {
165  Interest interest(origInterest); // to preserve Interest elements
166  interest.setName(Name(m_versionedDataName).appendSegment(segment.first));
167  interest.setCanBePrefix(false);
168  interest.setMustBeFresh(false);
169  interest.setInterestLifetime(m_options.interestLifetime);
170  interest.refreshNonce();
171  sendInterest(segment.first, interest, segment.second);
172  }
173 }
174 
175 void
176 SegmentFetcher::sendInterest(uint64_t segNum, const Interest& interest, bool isRetransmission)
177 {
178  weak_ptr<SegmentFetcher> weakSelf = m_this;
179 
180  ++m_nSegmentsInFlight;
181  auto pendingInterest = m_face.expressInterest(interest,
182  [this, weakSelf] (const Interest& interest, const Data& data) {
183  afterSegmentReceivedCb(interest, data, weakSelf);
184  },
185  [this, weakSelf] (const Interest& interest, const lp::Nack& nack) {
186  afterNackReceivedCb(interest, nack, weakSelf);
187  },
188  nullptr);
189 
190  auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
191  auto timeoutEvent = m_scheduler.schedule(timeout, [this, interest, weakSelf] {
192  afterTimeoutCb(interest, weakSelf);
193  });
194 
195  if (isRetransmission) {
196  updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
197  return;
198  }
199 
200  PendingSegment pendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
201  pendingInterest, timeoutEvent};
202  bool isNew = m_pendingSegments.emplace(segNum, std::move(pendingSegment)).second;
203  BOOST_VERIFY(isNew);
204  m_highInterest = segNum;
205 }
206 
207 void
208 SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
209  const weak_ptr<SegmentFetcher>& weakSelf)
210 {
211  if (shouldStop(weakSelf))
212  return;
213 
214  BOOST_ASSERT(m_nSegmentsInFlight > 0);
215  m_nSegmentsInFlight--;
216 
217  name::Component currentSegmentComponent = data.getName().get(-1);
218  if (!currentSegmentComponent.isSegment()) {
219  return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
220  }
221 
222  uint64_t currentSegment = currentSegmentComponent.toSegment();
223 
224  // The first received Interest could have any segment ID
225  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
226  if (m_receivedSegments.size() > 0) {
227  pendingSegmentIt = m_pendingSegments.find(currentSegment);
228  }
229  else {
230  pendingSegmentIt = m_pendingSegments.begin();
231  }
232 
233  if (pendingSegmentIt == m_pendingSegments.end()) {
234  return;
235  }
236 
237  pendingSegmentIt->second.timeoutEvent.cancel();
238 
239  afterSegmentReceived(data);
240 
241  m_validator.validate(data,
242  [=] (const Data& d) { afterValidationSuccess(d, origInterest, pendingSegmentIt, weakSelf); },
243  [=] (const Data& d, const auto& error) { afterValidationFailure(d, error, weakSelf); });
244 }
245 
246 void
247 SegmentFetcher::afterValidationSuccess(const Data& data, const Interest& origInterest,
248  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
249  const weak_ptr<SegmentFetcher>& weakSelf)
250 {
251  if (shouldStop(weakSelf))
252  return;
253 
254  // We update the last receive time here instead of in the segment received callback so that the
255  // transfer will not fail to terminate if we only received invalid Data packets.
256  m_timeLastSegmentReceived = time::steady_clock::now();
257 
258  m_nReceived++;
259 
260  // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
261  uint64_t currentSegment = data.getName().get(-1).toSegment();
262  m_receivedSegments.insert(currentSegment);
263 
264  // Add measurement to RTO estimator (if not retransmission)
265  if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
266  BOOST_ASSERT(m_nSegmentsInFlight >= 0);
267  m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
268  static_cast<size_t>(m_nSegmentsInFlight) + 1);
269  }
270 
271  // Remove from pending segments map
272  m_pendingSegments.erase(pendingSegmentIt);
273 
274  // Copy data in segment to temporary buffer
275  auto receivedSegmentIt = m_segmentBuffer.emplace(std::piecewise_construct,
276  std::forward_as_tuple(currentSegment),
277  std::forward_as_tuple(data.getContent().value_size()));
278  std::copy(data.getContent().value_begin(), data.getContent().value_end(),
279  receivedSegmentIt.first->second.begin());
280  m_nBytesReceived += data.getContent().value_size();
281  afterSegmentValidated(data);
282 
283  if (data.getFinalBlock()) {
284  if (!data.getFinalBlock()->isSegment()) {
285  return signalError(FINALBLOCKID_NOT_SEGMENT,
286  "Received FinalBlockId did not contain a segment component");
287  }
288 
289  if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
290  m_nSegments = data.getFinalBlock()->toSegment() + 1;
291  cancelExcessInFlightSegments();
292  }
293  }
294 
295  if (m_options.inOrder && m_nextSegmentInOrder == currentSegment) {
296  do {
297  onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
298  m_segmentBuffer.erase(m_nextSegmentInOrder++);
299  } while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
300  }
301 
302  if (m_receivedSegments.size() == 1) {
303  m_versionedDataName = data.getName().getPrefix(-1);
304  if (currentSegment == 0) {
305  // We received the first segment in response, so we can increment the next segment number
306  m_nextSegmentNum++;
307  }
308  }
309 
310  if (m_highData < currentSegment) {
311  m_highData = currentSegment;
312  }
313 
314  if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
315  windowDecrease();
316  }
317  else {
318  windowIncrease();
319  }
320 
321  fetchSegmentsInWindow(origInterest);
322 }
323 
324 void
325 SegmentFetcher::afterValidationFailure(const Data&,
326  const security::ValidationError& error,
327  const weak_ptr<SegmentFetcher>& weakSelf)
328 {
329  if (shouldStop(weakSelf))
330  return;
331 
332  signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " + boost::lexical_cast<std::string>(error));
333 }
334 
335 void
336 SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
337  const weak_ptr<SegmentFetcher>& weakSelf)
338 {
339  if (shouldStop(weakSelf))
340  return;
341 
343 
344  BOOST_ASSERT(m_nSegmentsInFlight > 0);
345  m_nSegmentsInFlight--;
346 
347  switch (nack.getReason()) {
350  afterNackOrTimeout(origInterest);
351  break;
352  default:
353  signalError(NACK_ERROR, "Nack Error");
354  break;
355  }
356 }
357 
358 void
359 SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
360  const weak_ptr<SegmentFetcher>& weakSelf)
361 {
362  if (shouldStop(weakSelf))
363  return;
364 
366 
367  BOOST_ASSERT(m_nSegmentsInFlight > 0);
368  m_nSegmentsInFlight--;
369  afterNackOrTimeout(origInterest);
370 }
371 
372 void
373 SegmentFetcher::afterNackOrTimeout(const Interest& origInterest)
374 {
375  if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
376  // Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
377  return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
378  }
379 
380  name::Component lastNameComponent = origInterest.getName().get(-1);
381  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
382  BOOST_ASSERT(m_pendingSegments.size() > 0);
383  if (lastNameComponent.isSegment()) {
384  BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
385  pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
386  }
387  else { // First Interest
388  BOOST_ASSERT(m_pendingSegments.size() > 0);
389  pendingSegmentIt = m_pendingSegments.begin();
390  }
391 
392  // Cancel timeout event and set status to InRetxQueue
393  pendingSegmentIt->second.timeoutEvent.cancel();
394  pendingSegmentIt->second.state = SegmentState::InRetxQueue;
395 
396  m_rttEstimator.backoffRto();
397 
398  if (m_receivedSegments.size() == 0) {
399  // Resend first Interest (until maximum receive timeout exceeded)
400  fetchFirstSegment(origInterest, true);
401  }
402  else {
403  windowDecrease();
404  m_retxQueue.push(pendingSegmentIt->first);
405  fetchSegmentsInWindow(origInterest);
406  }
407 }
408 
409 void
410 SegmentFetcher::finalizeFetch()
411 {
412  if (m_options.inOrder) {
414  }
415  else {
416  // Combine segments into final buffer
417  OBufferStream buf;
418  // We may have received more segments than exist in the object.
419  BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
420 
421  for (int64_t i = 0; i < m_nSegments; i++) {
422  buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
423  }
424  onComplete(buf.buf());
425  }
426  stop();
427 }
428 
429 void
430 SegmentFetcher::windowIncrease()
431 {
432  if (m_options.useConstantCwnd) {
433  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
434  return;
435  }
436 
437  if (m_cwnd < m_ssthresh) {
438  m_cwnd += m_options.aiStep; // additive increase
439  }
440  else {
441  m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
442  }
443 }
444 
445 void
446 SegmentFetcher::windowDecrease()
447 {
448  if (m_options.disableCwa || m_highData > m_recPoint) {
449  m_recPoint = m_highInterest;
450 
451  if (m_options.useConstantCwnd) {
452  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
453  return;
454  }
455 
456  // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
457  m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
458  m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
459  }
460 }
461 
462 void
463 SegmentFetcher::signalError(uint32_t code, const std::string& msg)
464 {
465  onError(code, msg);
466  stop();
467 }
468 
469 void
470 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
471  const PendingInterestHandle& pendingInterest,
472  scheduler::EventId timeoutEvent)
473 {
474  auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
475  BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
476  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
477  pendingSegmentIt->second.state = SegmentState::Retransmitted;
478  pendingSegmentIt->second.hdl = pendingInterest; // cancels previous pending Interest via scoped handle
479  pendingSegmentIt->second.timeoutEvent = timeoutEvent;
480 }
481 
482 void
483 SegmentFetcher::cancelExcessInFlightSegments()
484 {
485  for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
486  if (it->first >= static_cast<uint64_t>(m_nSegments)) {
487  it = m_pendingSegments.erase(it); // cancels pending Interest and timeout event
488  BOOST_ASSERT(m_nSegmentsInFlight > 0);
489  m_nSegmentsInFlight--;
490  }
491  else {
492  ++it;
493  }
494  }
495 }
496 
497 bool
498 SegmentFetcher::checkAllSegmentsReceived()
499 {
500  bool haveReceivedAllSegments = false;
501 
502  if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
503  haveReceivedAllSegments = true;
504  // Verify that all segments in window have been received. If not, send Interests for missing segments.
505  for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
506  if (m_receivedSegments.count(i) == 0) {
507  m_retxQueue.push(i);
508  haveReceivedAllSegments = false;
509  }
510  }
511  }
512 
513  return haveReceivedAllSegments;
514 }
515 
517 SegmentFetcher::getEstimatedRto()
518 {
519  // We don't want an Interest timeout greater than the maximum allowed timeout between the
520  // succesful receipt of segments
521  return std::min(m_options.maxTimeout,
522  time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
523 }
524 
525 } // namespace util
526 } // namespace ndn
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
Definition: name.hpp:209
Copyright (c) 2011-2015 Regents of the University of California.
Interest & setMustBeFresh(bool mustBeFresh)
Add or remove MustBeFresh element.
Definition: interest.hpp:214
ndn security Validator
Definition: validator.cpp:32
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::Validator &validator, const Options &options=Options())
Initiates segment fetching.
const Component & get(ssize_t i) const
Returns an immutable reference to the component at the specified index.
Definition: name.hpp:162
static time_point now() noexcept
Definition: time.cpp:80
An unrecoverable Nack was received during retrieval.
void refreshNonce()
Change nonce value.
Definition: interest.cpp:422
const_iterator value_begin() const noexcept
Get begin iterator of TLV-VALUE.
Definition: block.hpp:301
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emitted upon successful retrieval of the complete object (all segments).
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
Utility class to fetch the latest version of a segmented object.
size_t value_size() const noexcept
Return the size of TLV-VALUE, i.e., the TLV-LENGTH.
Definition: block.hpp:321
Represents an Interest packet.
Definition: interest.hpp:48
Signal< SegmentFetcher > onInOrderComplete
Emitted on successful retrieval of all segments in &#39;in order&#39; mode.
void stop()
Stops fetching.
A handle for a scheduled event.
Definition: scheduler.hpp:58
represents a Network Nack
Definition: nack.hpp:38
#define NDN_THROW(e)
Definition: exception.hpp:61
NackReason getReason() const
Definition: nack.hpp:90
double aiStep
additive increase step (in segments)
uint64_t getCongestionMark() const
get the value of the CongestionMark tag
Definition: packet-base.cpp:28
One of the retrieved segments failed user-provided validation.
Handle for a pending Interest.
Definition: face.hpp:437
bool isSegment() const
Check if the component is a segment number per NDN naming conventions.
Signal< SegmentFetcher > afterSegmentNacked
Emitted whenever an Interest for a data segment is nacked.
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:90
Signal< SegmentFetcher, Data > afterSegmentValidated
Emitted whenever a received data segment has been successfully validated.
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
One of the retrieved Data packets lacked a segment number in the last Name component (excl...
const Name & getName() const noexcept
Get name.
Definition: data.hpp:127
double initSsthresh
initial slow start threshold
double initCwnd
initial congestion window size
Represents a name component.
shared_ptr< Buffer > buf()
Flush written data to the stream and return shared pointer to the underlying buffer.
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emitted whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emitted whenever an Interest for a data segment times out.
RttEstimator::Options rttOptions
options for RTT estimator
const Block & getContent() const noexcept
Get the Content element.
Definition: data.hpp:175
const Name & getName() const noexcept
Definition: interest.hpp:172
Interest & setInterestLifetime(time::milliseconds lifetime)
Set the Interest&#39;s lifetime.
Definition: interest.cpp:435
implements an output stream that constructs ndn::Buffer
span_constexpr std::size_t size(span< T, Extent > const &spn)
Definition: span-lite.hpp:1535
const_iterator value_end() const noexcept
Get end iterator of TLV-VALUE.
Definition: block.hpp:311
Signal< SegmentFetcher, uint32_t, std::string > onError
Emitted when the retrieval could not be completed due to an error.
Represents a Data packet.
Definition: data.hpp:37
A received FinalBlockId did not contain a segment component.
const optional< name::Component > & getFinalBlock() const
Definition: data.hpp:293
Signal< SegmentFetcher, ConstBufferPtr > onInOrderData
Emitted after each data segment in segment order has been validated.
Interest & setCanBePrefix(bool canBePrefix)
Add or remove CanBePrefix element.
Definition: interest.hpp:195
boost::chrono::milliseconds milliseconds
Definition: time.hpp:48
Interest & setName(const Name &name)
Set the Interest&#39;s name.
Definition: interest.cpp:367