NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.5: NDN, CCN, CCNx, content centric networks
API Documentation
generic-link-service.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2021, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "generic-link-service.hpp"
27 
28 #include <ndn-cxx/lp/pit-token.hpp>
29 #include <ndn-cxx/lp/tags.hpp>
30 
31 #include <cmath>
32 
33 namespace nfd {
34 namespace face {
35 
37 
39  tlv::sizeOfVarNumber(sizeof(uint64_t)) + // length
40  tlv::sizeOfNonNegativeInteger(UINT64_MAX); // value
41 
42 GenericLinkService::GenericLinkService(const GenericLinkService::Options& options)
43  : m_options(options)
44  , m_fragmenter(m_options.fragmenterOptions, this)
45  , m_reassembler(m_options.reassemblerOptions, this)
46  , m_reliability(m_options.reliabilityOptions, this)
47  , m_lastSeqNo(-2)
48  , m_nextMarkTime(time::steady_clock::time_point::max())
49  , m_nMarkedSinceInMarkingState(0)
50 {
51  m_reassembler.beforeTimeout.connect([this] (auto&&...) { ++nReassemblyTimeouts; });
52  m_reliability.onDroppedInterest.connect([this] (const auto& i) { notifyDroppedInterest(i); });
53  nReassembling.observe(&m_reassembler);
54 }
55 
56 void
57 GenericLinkService::setOptions(const GenericLinkService::Options& options)
58 {
59  m_options = options;
60  m_fragmenter.setOptions(m_options.fragmenterOptions);
61  m_reassembler.setOptions(m_options.reassemblerOptions);
62  m_reliability.setOptions(m_options.reliabilityOptions);
63 }
64 
65 ssize_t
66 GenericLinkService::getEffectiveMtu() const
67 {
68  // Since MTU_UNLIMITED is negative, it will implicitly override any finite override MTU
69  return std::min(m_options.overrideMtu, getTransport()->getMtu());
70 }
71 
72 bool
73 GenericLinkService::canOverrideMtuTo(ssize_t mtu) const
74 {
75  // Not allowed to override unlimited transport MTU
76  if (getTransport()->getMtu() == MTU_UNLIMITED) {
77  return false;
78  }
79 
80  // Override MTU must be at least MIN_MTU (also implicitly forbids MTU_UNLIMITED and MTU_INVALID)
81  return mtu >= MIN_MTU;
82 }
83 
84 void
85 GenericLinkService::requestIdlePacket()
86 {
87  // No need to request Acks to attach to this packet from LpReliability, as they are already
88  // attached in sendLpPacket
89  NFD_LOG_FACE_TRACE("IDLE packet requested");
90  this->sendLpPacket({});
91 }
92 
93 void
94 GenericLinkService::sendLpPacket(lp::Packet&& pkt)
95 {
96  const ssize_t mtu = getEffectiveMtu();
97 
98  if (m_options.reliabilityOptions.isEnabled) {
99  m_reliability.piggyback(pkt, mtu);
100  }
101 
102  if (m_options.allowCongestionMarking) {
103  checkCongestionLevel(pkt);
104  }
105 
106  auto block = pkt.wireEncode();
107  if (mtu != MTU_UNLIMITED && block.size() > static_cast<size_t>(mtu)) {
108  ++nOutOverMtu;
109  NFD_LOG_FACE_WARN("attempted to send packet over MTU limit");
110  return;
111  }
112  this->sendPacket(block);
113 }
114 
115 void
116 GenericLinkService::doSendInterest(const Interest& interest)
117 {
118  lp::Packet lpPacket(interest.wireEncode());
119 
120  encodeLpFields(interest, lpPacket);
121 
122  this->sendNetPacket(std::move(lpPacket), true);
123 }
124 
125 void
126 GenericLinkService::doSendData(const Data& data)
127 {
128  lp::Packet lpPacket(data.wireEncode());
129 
130  encodeLpFields(data, lpPacket);
131 
132  this->sendNetPacket(std::move(lpPacket), false);
133 }
134 
135 void
136 GenericLinkService::doSendNack(const lp::Nack& nack)
137 {
138  lp::Packet lpPacket(nack.getInterest().wireEncode());
139  lpPacket.add<lp::NackField>(nack.getHeader());
140 
141  encodeLpFields(nack, lpPacket);
142 
143  this->sendNetPacket(std::move(lpPacket), false);
144 }
145 
146 void
147 GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
148 {
149  std::for_each(pkts.begin(), pkts.end(), [this] (lp::Packet& pkt) {
150  pkt.set<lp::SequenceField>(++m_lastSeqNo);
151  });
152 }
153 
154 void
155 GenericLinkService::encodeLpFields(const ndn::PacketBase& netPkt, lp::Packet& lpPacket)
156 {
157  if (m_options.allowLocalFields) {
158  auto incomingFaceIdTag = netPkt.getTag<lp::IncomingFaceIdTag>();
159  if (incomingFaceIdTag != nullptr) {
160  lpPacket.add<lp::IncomingFaceIdField>(*incomingFaceIdTag);
161  }
162  }
163 
164  auto congestionMarkTag = netPkt.getTag<lp::CongestionMarkTag>();
165  if (congestionMarkTag != nullptr) {
166  lpPacket.add<lp::CongestionMarkField>(*congestionMarkTag);
167  }
168 
169  if (m_options.allowSelfLearning) {
170  auto nonDiscoveryTag = netPkt.getTag<lp::NonDiscoveryTag>();
171  if (nonDiscoveryTag != nullptr) {
172  lpPacket.add<lp::NonDiscoveryField>(*nonDiscoveryTag);
173  }
174 
175  auto prefixAnnouncementTag = netPkt.getTag<lp::PrefixAnnouncementTag>();
176  if (prefixAnnouncementTag != nullptr) {
177  lpPacket.add<lp::PrefixAnnouncementField>(*prefixAnnouncementTag);
178  }
179  }
180 
181  auto pitToken = netPkt.getTag<lp::PitToken>();
182  if (pitToken != nullptr) {
183  lpPacket.add<lp::PitTokenField>(*pitToken);
184  }
185 
186  shared_ptr<lp::HopCountTag> hopCountTag = netPkt.getTag<lp::HopCountTag>();
187  if (hopCountTag != nullptr) {
188  lpPacket.add<lp::HopCountTagField>(*hopCountTag);
189  }
190  else {
191  lpPacket.add<lp::HopCountTagField>(0);
192  }
193 
194  if (m_options.enableGeoTags) {
195  auto geoTag = m_options.enableGeoTags();
196  if (geoTag != nullptr) {
197  lpPacket.add<lp::GeoTagField>(*geoTag);
198  }
199  }
200 }
201 
202 void
203 GenericLinkService::sendNetPacket(lp::Packet&& pkt, bool isInterest)
204 {
205  std::vector<lp::Packet> frags;
206  ssize_t mtu = getEffectiveMtu();
207 
208  // Make space for feature fields in fragments
209  if (m_options.reliabilityOptions.isEnabled && mtu != MTU_UNLIMITED) {
211  }
212 
213  if (m_options.allowCongestionMarking && mtu != MTU_UNLIMITED) {
214  mtu -= CONGESTION_MARK_SIZE;
215  }
216 
217  // An MTU of 0 is allowed but will cause all packets to be dropped before transmission
218  BOOST_ASSERT(mtu == MTU_UNLIMITED || mtu >= 0);
219 
220  if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
221  bool isOk = false;
222  std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
223  if (!isOk) {
224  // fragmentation failed (warning is logged by LpFragmenter)
225  ++nFragmentationErrors;
226  return;
227  }
228  }
229  else {
230  if (m_options.reliabilityOptions.isEnabled) {
231  frags.push_back(pkt);
232  }
233  else {
234  frags.push_back(std::move(pkt));
235  }
236  }
237 
238  if (frags.size() == 1) {
239  // even if indexed fragmentation is enabled, the fragmenter should not
240  // fragment the packet if it can fit in MTU
241  BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
242  BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
243  }
244 
245  // Only assign sequences to fragments if reliability enabled or if packet contains >1 fragment
246  if (m_options.reliabilityOptions.isEnabled || frags.size() > 1) {
247  // Assign sequences to all fragments
248  this->assignSequences(frags);
249  }
250 
251  if (m_options.reliabilityOptions.isEnabled && frags.front().has<lp::FragmentField>()) {
252  m_reliability.handleOutgoing(frags, std::move(pkt), isInterest);
253  }
254 
255  for (lp::Packet& frag : frags) {
256  this->sendLpPacket(std::move(frag));
257  }
258 }
259 
260 void
261 GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
262 {
263  ssize_t sendQueueLength = getTransport()->getSendQueueLength();
264  // The transport must support retrieving the current send queue length
265  if (sendQueueLength < 0) {
266  return;
267  }
268 
269  if (sendQueueLength > 0) {
270  NFD_LOG_FACE_TRACE("txqlen=" << sendQueueLength << " threshold=" <<
271  m_options.defaultCongestionThreshold << " capacity=" <<
272  getTransport()->getSendQueueCapacity());
273  }
274 
275  // sendQueue is above target
276  if (static_cast<size_t>(sendQueueLength) > m_options.defaultCongestionThreshold) {
277  const auto now = time::steady_clock::now();
278 
279  if (m_nextMarkTime == time::steady_clock::time_point::max()) {
280  m_nextMarkTime = now + m_options.baseCongestionMarkingInterval;
281  }
282  // Mark packet if sendQueue stays above target for one interval
283  else if (now >= m_nextMarkTime) {
285  ++nCongestionMarked;
286  NFD_LOG_FACE_DEBUG("LpPacket was marked as congested");
287 
288  ++m_nMarkedSinceInMarkingState;
289  // Decrease the marking interval by the inverse of the square root of the number of packets
290  // marked in this incident of congestion
291  time::nanoseconds interval(static_cast<time::nanoseconds::rep>(
292  m_options.baseCongestionMarkingInterval.count() /
293  std::sqrt(m_nMarkedSinceInMarkingState + 1)));
294  m_nextMarkTime += interval;
295  }
296  }
297  else if (m_nextMarkTime != time::steady_clock::time_point::max()) {
298  // Congestion incident has ended, so reset
299  NFD_LOG_FACE_DEBUG("Send queue length dropped below congestion threshold");
300  m_nextMarkTime = time::steady_clock::time_point::max();
301  m_nMarkedSinceInMarkingState = 0;
302  }
303 }
304 
305 void
306 GenericLinkService::doReceivePacket(const Block& packet, const EndpointId& endpoint)
307 {
308  try {
309  lp::Packet pkt(packet);
310 
311  if (m_options.reliabilityOptions.isEnabled) {
312  if (!m_reliability.processIncomingPacket(pkt)) {
313  NFD_LOG_FACE_TRACE("received duplicate fragment: DROP");
314  ++nDuplicateSequence;
315  return;
316  }
317  }
318 
319  if (!pkt.has<lp::FragmentField>()) {
320  NFD_LOG_FACE_TRACE("received IDLE packet: DROP");
321  return;
322  }
323 
324  if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
325  !m_options.allowReassembly) {
326  NFD_LOG_FACE_WARN("received fragment, but reassembly disabled: DROP");
327  return;
328  }
329 
330  bool isReassembled = false;
331  Block netPkt;
332  lp::Packet firstPkt;
333  std::tie(isReassembled, netPkt, firstPkt) = m_reassembler.receiveFragment(endpoint, pkt);
334  if (isReassembled) {
335  this->decodeNetPacket(netPkt, firstPkt, endpoint);
336  }
337  }
338  catch (const tlv::Error& e) {
339  ++nInLpInvalid;
340  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
341  }
342 }
343 
344 void
345 GenericLinkService::decodeNetPacket(const Block& netPkt, const lp::Packet& firstPkt,
346  const EndpointId& endpointId)
347 {
348  try {
349  switch (netPkt.type()) {
350  case tlv::Interest:
351  if (firstPkt.has<lp::NackField>()) {
352  this->decodeNack(netPkt, firstPkt, endpointId);
353  }
354  else {
355  this->decodeInterest(netPkt, firstPkt, endpointId);
356  }
357  break;
358  case tlv::Data:
359  this->decodeData(netPkt, firstPkt, endpointId);
360  break;
361  default:
362  ++nInNetInvalid;
363  NFD_LOG_FACE_WARN("unrecognized network-layer packet TLV-TYPE " << netPkt.type() << ": DROP");
364  return;
365  }
366  }
367  catch (const tlv::Error& e) {
368  ++nInNetInvalid;
369  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
370  }
371 }
372 
373 void
374 GenericLinkService::decodeInterest(const Block& netPkt, const lp::Packet& firstPkt,
375  const EndpointId& endpointId)
376 {
377  BOOST_ASSERT(netPkt.type() == tlv::Interest);
378  BOOST_ASSERT(!firstPkt.has<lp::NackField>());
379 
380  // forwarding expects Interest to be created with make_shared
381  auto interest = make_shared<Interest>(netPkt);
382 
383  // Increment HopCount
384  if (firstPkt.has<lp::HopCountTagField>()) {
385  interest->setTag(make_shared<lp::HopCountTag>(firstPkt.get<lp::HopCountTagField>() + 1));
386  }
387 
388  if (m_options.enableGeoTags && firstPkt.has<lp::GeoTagField>()) {
389  interest->setTag(make_shared<lp::GeoTag>(firstPkt.get<lp::GeoTagField>()));
390  }
391 
392  if (firstPkt.has<lp::NextHopFaceIdField>()) {
393  if (m_options.allowLocalFields) {
394  interest->setTag(make_shared<lp::NextHopFaceIdTag>(firstPkt.get<lp::NextHopFaceIdField>()));
395  }
396  else {
397  NFD_LOG_FACE_WARN("received NextHopFaceId, but local fields disabled: DROP");
398  return;
399  }
400  }
401 
402  if (firstPkt.has<lp::CachePolicyField>()) {
403  ++nInNetInvalid;
404  NFD_LOG_FACE_WARN("received CachePolicy with Interest: DROP");
405  return;
406  }
407 
408  if (firstPkt.has<lp::IncomingFaceIdField>()) {
409  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
410  }
411 
412  if (firstPkt.has<lp::CongestionMarkField>()) {
413  interest->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
414  }
415 
416  if (firstPkt.has<lp::NonDiscoveryField>()) {
417  if (m_options.allowSelfLearning) {
418  interest->setTag(make_shared<lp::NonDiscoveryTag>(firstPkt.get<lp::NonDiscoveryField>()));
419  }
420  else {
421  NFD_LOG_FACE_WARN("received NonDiscovery, but self-learning disabled: IGNORE");
422  }
423  }
424 
425  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
426  ++nInNetInvalid;
427  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Interest: DROP");
428  return;
429  }
430 
431  if (firstPkt.has<lp::PitTokenField>()) {
432  interest->setTag(make_shared<lp::PitToken>(firstPkt.get<lp::PitTokenField>()));
433  }
434 
435  this->receiveInterest(*interest, endpointId);
436 }
437 
438 void
439 GenericLinkService::decodeData(const Block& netPkt, const lp::Packet& firstPkt,
440  const EndpointId& endpointId)
441 {
442  BOOST_ASSERT(netPkt.type() == tlv::Data);
443 
444  // forwarding expects Data to be created with make_shared
445  auto data = make_shared<Data>(netPkt);
446 
447  if (firstPkt.has<lp::HopCountTagField>()) {
448  data->setTag(make_shared<lp::HopCountTag>(firstPkt.get<lp::HopCountTagField>() + 1));
449  }
450 
451  if (m_options.enableGeoTags && firstPkt.has<lp::GeoTagField>()) {
452  data->setTag(make_shared<lp::GeoTag>(firstPkt.get<lp::GeoTagField>()));
453  }
454 
455  if (firstPkt.has<lp::NackField>()) {
456  ++nInNetInvalid;
457  NFD_LOG_FACE_WARN("received Nack with Data: DROP");
458  return;
459  }
460 
461  if (firstPkt.has<lp::NextHopFaceIdField>()) {
462  ++nInNetInvalid;
463  NFD_LOG_FACE_WARN("received NextHopFaceId with Data: DROP");
464  return;
465  }
466 
467  if (firstPkt.has<lp::CachePolicyField>()) {
468  // CachePolicy is unprivileged and does not require allowLocalFields option.
469  // In case of an invalid CachePolicyType, get<lp::CachePolicyField> will throw,
470  // so it's unnecessary to check here.
471  data->setTag(make_shared<lp::CachePolicyTag>(firstPkt.get<lp::CachePolicyField>()));
472  }
473 
474  if (firstPkt.has<lp::IncomingFaceIdField>()) {
475  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
476  }
477 
478  if (firstPkt.has<lp::CongestionMarkField>()) {
479  data->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
480  }
481 
482  if (firstPkt.has<lp::NonDiscoveryField>()) {
483  ++nInNetInvalid;
484  NFD_LOG_FACE_WARN("received NonDiscovery with Data: DROP");
485  return;
486  }
487 
488  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
489  if (m_options.allowSelfLearning) {
490  data->setTag(make_shared<lp::PrefixAnnouncementTag>(firstPkt.get<lp::PrefixAnnouncementField>()));
491  }
492  else {
493  NFD_LOG_FACE_WARN("received PrefixAnnouncement, but self-learning disabled: IGNORE");
494  }
495  }
496 
497  this->receiveData(*data, endpointId);
498 }
499 
500 void
501 GenericLinkService::decodeNack(const Block& netPkt, const lp::Packet& firstPkt,
502  const EndpointId& endpointId)
503 {
504  BOOST_ASSERT(netPkt.type() == tlv::Interest);
505  BOOST_ASSERT(firstPkt.has<lp::NackField>());
506 
507  lp::Nack nack((Interest(netPkt)));
508  nack.setHeader(firstPkt.get<lp::NackField>());
509 
510  if (firstPkt.has<lp::NextHopFaceIdField>()) {
511  ++nInNetInvalid;
512  NFD_LOG_FACE_WARN("received NextHopFaceId with Nack: DROP");
513  return;
514  }
515 
516  if (firstPkt.has<lp::CachePolicyField>()) {
517  ++nInNetInvalid;
518  NFD_LOG_FACE_WARN("received CachePolicy with Nack: DROP");
519  return;
520  }
521 
522  if (firstPkt.has<lp::IncomingFaceIdField>()) {
523  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
524  }
525 
526  if (firstPkt.has<lp::CongestionMarkField>()) {
527  nack.setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
528  }
529 
530  if (firstPkt.has<lp::NonDiscoveryField>()) {
531  ++nInNetInvalid;
532  NFD_LOG_FACE_WARN("received NonDiscovery with Nack: DROP");
533  return;
534  }
535 
536  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
537  ++nInNetInvalid;
538  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Nack: DROP");
539  return;
540  }
541 
542  this->receiveNack(nack, endpointId);
543 }
544 
545 } // namespace face
546 } // namespace nfd
void setTag(shared_ptr< T > tag) const
set a tag item
Definition: tag-host.hpp:79
NDN_CXX_NODISCARD bool has() const
Definition: packet.hpp:74
shared_ptr< T > getTag() const
get a tag item
Definition: tag-host.hpp:66
constexpr size_t sizeOfNonNegativeInteger(uint64_t integer) noexcept
Get the number of bytes necessary to hold the value of integer encoded as NonNegativeInteger.
Definition: tlv.hpp:506
#define NFD_LOG_INIT(name)
Definition: logger.hpp:31
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
Packet & set(const typename FIELD::ValueType &value)
remove all occurrences of FIELD, and add a FIELD with value
Definition: packet.hpp:136
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
Definition: transport.hpp:91
static time_point now() noexcept
Definition: time.cpp:80
Packet & add(const typename FIELD::ValueType &value)
add a FIELD with value
Definition: packet.hpp:148
Nack & setHeader(const NackHeader &header)
Definition: nack.hpp:75
Represents a TLV element of the NDN packet format.
Definition: block.hpp:44
Represents an Interest packet.
Definition: interest.hpp:48
const NackHeader & getHeader() const
Definition: nack.hpp:63
uint64_t EndpointId
Identifies a remote endpoint on the link.
Definition: face-common.hpp:71
static constexpr size_t RESERVED_HEADER_SPACE
TxSequence TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + lp::Sequence (8 octets)
represents a Network Nack
Definition: nack.hpp:38
Declare a field.
Definition: field-decl.hpp:176
provides a tag type for simple types
Definition: tag.hpp:58
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
FIELD::ValueType get(size_t index=0) const
Definition: packet.hpp:96
constexpr size_t sizeOfVarNumber(uint64_t number) noexcept
Get the number of bytes necessary to hold the value of number encoded as VAR-NUMBER.
Definition: tlv.hpp:454
Copyright (c) 2011-2015 Regents of the University of California.
Definition: ndn-common.hpp:39
base class to allow simple management of packet tags
Definition: packet-base.hpp:31
size_t wireEncode(EncodingImpl< TAG > &encoder) const
Prepend wire encoding to encoder.
Definition: interest.cpp:60
uint32_t type() const noexcept
Return the TLV-TYPE of the Block.
Definition: block.hpp:277
size_t wireEncode(EncodingImpl< TAG > &encoder, bool wantUnsignedPortionOnly=false) const
Prepend wire encoding to encoder.
Definition: data.cpp:46
const ssize_t MIN_MTU
Minimum MTU that may be set.
Definition: face-common.hpp:61
Represents a Data packet.
Definition: data.hpp:37
constexpr size_t CONGESTION_MARK_SIZE
const Interest & getInterest() const
Definition: nack.hpp:51
boost::chrono::nanoseconds nanoseconds
Definition: time.hpp:50
represents an error in TLV encoding or decoding
Definition: tlv.hpp:52
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
represent a PIT token field
Definition: pit-token.hpp:34