NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.5: NDN, CCN, CCNx, content centric networks
API Documentation
segment-fetcher.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2013-2019, Regents of the University of California,
4  * Colorado State University,
5  * University Pierre & Marie Curie, Sorbonne University.
6  *
7  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
8  *
9  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
10  * terms of the GNU Lesser General Public License as published by the Free Software
11  * Foundation, either version 3 of the License, or (at your option) any later version.
12  *
13  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
14  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
15  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
16  *
17  * You should have received copies of the GNU General Public License and GNU Lesser
18  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
19  * <http://www.gnu.org/licenses/>.
20  *
21  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22  *
23  * @author Shuo Yang
24  * @author Weiwei Liu
25  * @author Chavoosh Ghasemi
26  */
27 
31 #include "ndn-cxx/lp/nack.hpp"
33 
34 #include <boost/asio/io_service.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/range/adaptor/map.hpp>
37 
38 #include <cmath>
39 
40 namespace ndn {
41 namespace util {
42 
43 constexpr double SegmentFetcher::MIN_SSTHRESH;
44 
45 void
47 {
48  if (maxTimeout < 1_ms) {
49  BOOST_THROW_EXCEPTION(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
50  }
51 
52  if (initCwnd < 1.0) {
53  BOOST_THROW_EXCEPTION(std::invalid_argument("initCwnd must be greater than or equal to 1"));
54  }
55 
56  if (aiStep < 0.0) {
57  BOOST_THROW_EXCEPTION(std::invalid_argument("aiStep must be greater than or equal to 0"));
58  }
59 
60  if (mdCoef < 0.0 || mdCoef > 1.0) {
61  BOOST_THROW_EXCEPTION(std::invalid_argument("mdCoef must be in range [0, 1]"));
62  }
63 }
64 
65 SegmentFetcher::SegmentFetcher(Face& face,
66  security::v2::Validator& validator,
67  const SegmentFetcher::Options& options)
68  : m_options(options)
69  , m_face(face)
70  , m_scheduler(m_face.getIoService())
71  , m_validator(validator)
72  , m_rttEstimator(options.rttOptions)
73  , m_timeLastSegmentReceived(time::steady_clock::now())
74  , m_nextSegmentNum(0)
75  , m_cwnd(options.initCwnd)
76  , m_ssthresh(options.initSsthresh)
77  , m_nSegmentsInFlight(0)
78  , m_nSegments(0)
79  , m_highInterest(0)
80  , m_highData(0)
81  , m_recPoint(0)
82  , m_nReceived(0)
83  , m_nBytesReceived(0)
84 {
85  m_options.validate();
86 }
87 
88 shared_ptr<SegmentFetcher>
90  const Interest& baseInterest,
91  security::v2::Validator& validator,
92  const SegmentFetcher::Options& options)
93 {
94  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
95  fetcher->m_this = fetcher;
96  fetcher->fetchFirstSegment(baseInterest, false);
97  return fetcher;
98 }
99 
100 void
102 {
103  if (!m_this) {
104  return;
105  }
106 
107  for (const auto& pendingSegment : m_pendingSegments | boost::adaptors::map_values) {
108  m_face.removePendingInterest(pendingSegment.id);
109  if (pendingSegment.timeoutEvent) {
110  m_scheduler.cancelEvent(pendingSegment.timeoutEvent);
111  }
112  }
113  m_scheduler.scheduleEvent(0_s, [self = std::move(m_this)] {});
114 }
115 
116 bool
117 SegmentFetcher::shouldStop(const weak_ptr<SegmentFetcher>& weakSelf)
118 {
119  auto self = weakSelf.lock();
120  return self == nullptr || self->m_this == nullptr;
121 }
122 
123 void
124 SegmentFetcher::fetchFirstSegment(const Interest& baseInterest, bool isRetransmission)
125 {
126  Interest interest(baseInterest);
127  interest.setCanBePrefix(true);
128  interest.setMustBeFresh(true);
129  interest.setInterestLifetime(m_options.interestLifetime);
130  if (isRetransmission) {
131  interest.refreshNonce();
132  }
133 
134  weak_ptr<SegmentFetcher> weakSelf = m_this;
135 
136  m_nSegmentsInFlight++;
137  auto pendingInterest = m_face.expressInterest(interest,
138  bind(&SegmentFetcher::afterSegmentReceivedCb,
139  this, _1, _2, weakSelf),
140  bind(&SegmentFetcher::afterNackReceivedCb,
141  this, _1, _2, weakSelf),
142  nullptr);
143  auto timeoutEvent =
144  m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
145  bind(&SegmentFetcher::afterTimeoutCb, this, interest, weakSelf));
146 
147  if (isRetransmission) {
148  updateRetransmittedSegment(0, pendingInterest, timeoutEvent);
149  }
150  else {
151  BOOST_ASSERT(m_pendingSegments.count(0) == 0);
152  m_pendingSegments.emplace(0, PendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
153  pendingInterest, timeoutEvent});
154  }
155 }
156 
157 void
158 SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest)
159 {
160  weak_ptr<SegmentFetcher> weakSelf = m_this;
161 
162  if (checkAllSegmentsReceived()) {
163  // All segments have been retrieved
164  return finalizeFetch();
165  }
166 
167  int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
168  std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
169 
170  while (availableWindowSize > 0) {
171  if (!m_retxQueue.empty()) {
172  auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
173  m_retxQueue.pop();
174  if (pendingSegmentIt == m_pendingSegments.end()) {
175  // Skip re-requesting this segment, since it was received after RTO timeout
176  continue;
177  }
178  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
179  segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
180  }
181  else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
182  if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
183  // Don't request a segment a second time if received in response to first "discovery" Interest
184  m_nextSegmentNum++;
185  continue;
186  }
187  segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
188  }
189  else {
190  break;
191  }
192  availableWindowSize--;
193  }
194 
195  for (const auto& segment : segmentsToRequest) {
196  Interest interest(origInterest); // to preserve Interest elements
197  interest.setName(Name(m_versionedDataName).appendSegment(segment.first));
198  interest.setCanBePrefix(false);
199  interest.setMustBeFresh(false);
200  interest.setInterestLifetime(m_options.interestLifetime);
201  interest.refreshNonce();
202 
203  m_nSegmentsInFlight++;
204  auto pendingInterest = m_face.expressInterest(interest,
205  bind(&SegmentFetcher::afterSegmentReceivedCb,
206  this, _1, _2, weakSelf),
207  bind(&SegmentFetcher::afterNackReceivedCb,
208  this, _1, _2, weakSelf),
209  nullptr);
210  auto timeoutEvent =
211  m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
212  bind(&SegmentFetcher::afterTimeoutCb, this, interest, weakSelf));
213 
214  if (segment.second) { // Retransmission
215  updateRetransmittedSegment(segment.first, pendingInterest, timeoutEvent);
216  }
217  else { // First request for segment
218  BOOST_ASSERT(m_pendingSegments.count(segment.first) == 0);
219  m_pendingSegments.emplace(segment.first, PendingSegment{SegmentState::FirstInterest,
220  time::steady_clock::now(),
221  pendingInterest, timeoutEvent});
222  m_highInterest = segment.first;
223  }
224  }
225 }
226 
227 void
228 SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
229  const weak_ptr<SegmentFetcher>& weakSelf)
230 {
231  if (shouldStop(weakSelf))
232  return;
233 
234  BOOST_ASSERT(m_nSegmentsInFlight > 0);
235  m_nSegmentsInFlight--;
236 
237  name::Component currentSegmentComponent = data.getName().get(-1);
238  if (!currentSegmentComponent.isSegment()) {
239  return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
240  }
241 
242  uint64_t currentSegment = currentSegmentComponent.toSegment();
243 
244  // The first received Interest could have any segment ID
246  if (m_receivedSegments.size() > 0) {
247  pendingSegmentIt = m_pendingSegments.find(currentSegment);
248  }
249  else {
250  pendingSegmentIt = m_pendingSegments.begin();
251  }
252 
253  if (pendingSegmentIt == m_pendingSegments.end()) {
254  return;
255  }
256 
257  afterSegmentReceived(data);
258 
259  // Cancel timeout event
260  pendingSegmentIt->second.timeoutEvent.cancel();
261 
262  m_validator.validate(data,
263  bind(&SegmentFetcher::afterValidationSuccess, this, _1, origInterest,
264  pendingSegmentIt, weakSelf),
265  bind(&SegmentFetcher::afterValidationFailure, this, _1, _2, weakSelf));
266 }
267 
268 void
269 SegmentFetcher::afterValidationSuccess(const Data& data, const Interest& origInterest,
271  const weak_ptr<SegmentFetcher>& weakSelf)
272 {
273  if (shouldStop(weakSelf))
274  return;
275 
276  // We update the last receive time here instead of in the segment received callback so that the
277  // transfer will not fail to terminate if we only received invalid Data packets.
278  m_timeLastSegmentReceived = time::steady_clock::now();
279 
280  m_nReceived++;
281 
282  // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
283  uint64_t currentSegment = data.getName().get(-1).toSegment();
284  // Add measurement to RTO estimator (if not retransmission)
285  if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
286  m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
287  std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
288  }
289 
290  // Remove from pending segments map
291  m_pendingSegments.erase(pendingSegmentIt);
292 
293  // Copy data in segment to temporary buffer
294  auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
295  std::forward_as_tuple(currentSegment),
296  std::forward_as_tuple(data.getContent().value_size()));
297  std::copy(data.getContent().value_begin(), data.getContent().value_end(),
298  receivedSegmentIt.first->second.begin());
299  m_nBytesReceived += data.getContent().value_size();
300  afterSegmentValidated(data);
301 
302  if (data.getFinalBlock()) {
303  if (!data.getFinalBlock()->isSegment()) {
304  return signalError(FINALBLOCKID_NOT_SEGMENT,
305  "Received FinalBlockId did not contain a segment component");
306  }
307 
308  if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
309  m_nSegments = data.getFinalBlock()->toSegment() + 1;
310  cancelExcessInFlightSegments();
311  }
312  }
313 
314  if (m_receivedSegments.size() == 1) {
315  m_versionedDataName = data.getName().getPrefix(-1);
316  if (currentSegment == 0) {
317  // We received the first segment in response, so we can increment the next segment number
318  m_nextSegmentNum++;
319  }
320  }
321 
322  if (m_highData < currentSegment) {
323  m_highData = currentSegment;
324  }
325 
326  if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
327  windowDecrease();
328  }
329  else {
330  windowIncrease();
331  }
332 
333  fetchSegmentsInWindow(origInterest);
334 }
335 
336 void
337 SegmentFetcher::afterValidationFailure(const Data& data,
338  const security::v2::ValidationError& error,
339  const weak_ptr<SegmentFetcher>& weakSelf)
340 {
341  if (shouldStop(weakSelf))
342  return;
343 
344  signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " + boost::lexical_cast<std::string>(error));
345 }
346 
347 void
348 SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
349  const weak_ptr<SegmentFetcher>& weakSelf)
350 {
351  if (shouldStop(weakSelf))
352  return;
353 
355 
356  BOOST_ASSERT(m_nSegmentsInFlight > 0);
357  m_nSegmentsInFlight--;
358 
359  switch (nack.getReason()) {
362  afterNackOrTimeout(origInterest);
363  break;
364  default:
365  signalError(NACK_ERROR, "Nack Error");
366  break;
367  }
368 }
369 
370 void
371 SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
372  const weak_ptr<SegmentFetcher>& weakSelf)
373 {
374  if (shouldStop(weakSelf))
375  return;
376 
378 
379  BOOST_ASSERT(m_nSegmentsInFlight > 0);
380  m_nSegmentsInFlight--;
381  afterNackOrTimeout(origInterest);
382 }
383 
384 void
385 SegmentFetcher::afterNackOrTimeout(const Interest& origInterest)
386 {
387  if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
388  // Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
389  return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
390  }
391 
392  name::Component lastNameComponent = origInterest.getName().get(-1);
394  BOOST_ASSERT(m_pendingSegments.size() > 0);
395  if (lastNameComponent.isSegment()) {
396  BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
397  pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
398  }
399  else { // First Interest
400  BOOST_ASSERT(m_pendingSegments.size() > 0);
401  pendingSegmentIt = m_pendingSegments.begin();
402  }
403 
404  // Cancel timeout event and set status to InRetxQueue
405  pendingSegmentIt->second.timeoutEvent.cancel();
406  pendingSegmentIt->second.state = SegmentState::InRetxQueue;
407 
408  m_rttEstimator.backoffRto();
409 
410  if (m_receivedSegments.size() == 0) {
411  // Resend first Interest (until maximum receive timeout exceeded)
412  fetchFirstSegment(origInterest, true);
413  }
414  else {
415  windowDecrease();
416  m_retxQueue.push(pendingSegmentIt->first);
417  fetchSegmentsInWindow(origInterest);
418  }
419 }
420 
421 void
422 SegmentFetcher::finalizeFetch()
423 {
424  // Combine segments into final buffer
425  OBufferStream buf;
426  // We may have received more segments than exist in the object.
427  BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
428 
429  for (int64_t i = 0; i < m_nSegments; i++) {
430  buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
431  }
432 
433  onComplete(buf.buf());
434  stop();
435 }
436 
437 void
438 SegmentFetcher::windowIncrease()
439 {
440  if (m_options.useConstantCwnd) {
441  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
442  return;
443  }
444 
445  if (m_cwnd < m_ssthresh) {
446  m_cwnd += m_options.aiStep; // additive increase
447  }
448  else {
449  m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
450  }
451 }
452 
453 void
454 SegmentFetcher::windowDecrease()
455 {
456  if (m_options.disableCwa || m_highData > m_recPoint) {
457  m_recPoint = m_highInterest;
458 
459  if (m_options.useConstantCwnd) {
460  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
461  return;
462  }
463 
464  // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
465  m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
466  m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
467  }
468 }
469 
470 void
471 SegmentFetcher::signalError(uint32_t code, const std::string& msg)
472 {
473  onError(code, msg);
474  stop();
475 }
476 
477 void
478 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
479  const PendingInterestId* pendingInterest,
480  scheduler::EventId timeoutEvent)
481 {
482  auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
483  BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
484  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
485  pendingSegmentIt->second.state = SegmentState::Retransmitted;
486  m_face.removePendingInterest(pendingSegmentIt->second.id);
487  pendingSegmentIt->second.id = pendingInterest;
488  pendingSegmentIt->second.timeoutEvent = timeoutEvent;
489 }
490 
491 void
492 SegmentFetcher::cancelExcessInFlightSegments()
493 {
494  for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
495  if (it->first >= static_cast<uint64_t>(m_nSegments)) {
496  m_face.removePendingInterest(it->second.id);
497  if (it->second.timeoutEvent) {
498  m_scheduler.cancelEvent(it->second.timeoutEvent);
499  }
500  it = m_pendingSegments.erase(it);
501  BOOST_ASSERT(m_nSegmentsInFlight > 0);
502  m_nSegmentsInFlight--;
503  }
504  else {
505  ++it;
506  }
507  }
508 }
509 
510 bool
511 SegmentFetcher::checkAllSegmentsReceived()
512 {
513  bool haveReceivedAllSegments = false;
514 
515  if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
516  haveReceivedAllSegments = true;
517  // Verify that all segments in window have been received. If not, send Interests for missing segments.
518  for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
519  if (m_receivedSegments.count(i) == 0) {
520  m_retxQueue.push(i);
521  haveReceivedAllSegments = false;
522  }
523  }
524  }
525 
526  return haveReceivedAllSegments;
527 }
528 
529 time::milliseconds
530 SegmentFetcher::getEstimatedRto()
531 {
532  // We don't want an Interest timeout greater than the maximum allowed timeout between the
533  // succesful receipt of segments
534  return std::min(m_options.maxTimeout,
535  time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
536 }
537 
538 } // namespace util
539 } // namespace ndn
PartialName getPrefix(ssize_t nComponents) const
Extract a prefix of the name.
Definition: name.hpp:203
Copyright (c) 2011-2015 Regents of the University of California.
time::milliseconds interestLifetime
lifetime of sent Interests - independent of Interest timeout
static time_point now() noexcept
Definition: time.cpp:80
An unrecoverable Nack was received during retrieval.
double mdCoef
multiplicative decrease coefficient
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emits upon successful retrieval of the complete data.
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
bool useConstantInterestTimeout
if true, Interest timeout is kept at maxTimeout
Utility class to fetch the latest version of a segmented object.
Represents an Interest packet.
Definition: interest.hpp:44
void stop()
Stops fetching.
EventId scheduleEvent(time::nanoseconds after, const EventCallback &callback)
Schedule a one-time event after the specified delay.
Definition: scheduler.cpp:103
void backoffRto()
Backoff RTO by a factor of Options::rtoBackoffMultiplier.
represents a Network Nack
Definition: nack.hpp:38
NackReason getReason() const
Definition: nack.hpp:90
Table::const_iterator iterator
Definition: cs-internal.hpp:41
double aiStep
additive increase step (in segments)
bool disableCwa
disable Conservative Window Adaptation
void addMeasurement(MillisecondsDouble rtt, size_t nExpectedSamples)
Add a new RTT measurement to the estimator.
const Block & get(uint32_t type) const
Get the first sub element of specified TLV-TYPE.
Definition: block.cpp:422
One of the retrieved segments failed user-provided validation.
bool isSegment() const
Check if the component is segment number per NDN naming conventions.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher > afterSegmentNacked
Emits whenever an Interest for a data segment is nacked.
void validate(const Data &data, const DataValidationSuccessCallback &successCb, const DataValidationFailureCallback &failureCb)
Asynchronously validate data.
Definition: validator.cpp:75
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:93
Signal< SegmentFetcher, Data > afterSegmentValidated
Emits whenever a received data segment has been successfully validated.
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
One of the retrieved Data packets lacked a segment number in the last Name component (excl....
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
Definition: face.cpp:131
double initCwnd
initial congestion window size
bool ignoreCongMarks
disable window decrease after congestion mark received
Represents a name component.
bool resetCwndToInit
reduce cwnd to initCwnd when loss event occurs
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emits whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emits whenever an Interest for a data segment times out.
void cancelEvent(const EventId &eid)
Cancel a scheduled event.
Definition: scheduler.hpp:169
Signal< SegmentFetcher, uint32_t, std::string > onError
Emits when the retrieval could not be completed due to an error.
A received FinalBlockId did not contain a segment component.
MillisecondsDouble getEstimatedRto() const
Returns the estimated RTO value.
bool useConstantCwnd
if true, window size is kept at initCwnd
void removePendingInterest(const PendingInterestId *pendingInterestId)
Cancel previously expressed Interest.
Definition: face.cpp:147
Interface for validating data and interest packets.
Definition: validator.hpp:61