NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.5: NDN, CCN, CCNx, content centric networks
API Documentation
lp-reliability.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2018, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "lp-reliability.hpp"
27 #include "generic-link-service.hpp"
28 #include "transport.hpp"
29 
30 namespace nfd {
31 namespace face {
32 
34  : m_options(options)
35  , m_linkService(linkService)
36  , m_firstUnackedFrag(m_unackedFrags.begin())
37  , m_lastTxSeqNo(-1) // set to "-1" to start TxSequence numbers at 0
38  , m_isIdleAckTimerRunning(false)
39 {
40  BOOST_ASSERT(m_linkService != nullptr);
41 
42  BOOST_ASSERT(m_options.idleAckTimerPeriod > time::nanoseconds::zero());
43 }
44 
45 void
47 {
48  BOOST_ASSERT(options.idleAckTimerPeriod > time::nanoseconds::zero());
49 
50  if (m_options.isEnabled && !options.isEnabled) {
51  this->stopIdleAckTimer();
52  }
53 
54  m_options = options;
55 }
56 
57 const GenericLinkService*
59 {
60  return m_linkService;
61 }
62 
63 void
64 LpReliability::handleOutgoing(std::vector<lp::Packet>& frags, lp::Packet&& pkt, bool isInterest)
65 {
66  BOOST_ASSERT(m_options.isEnabled);
67 
68  auto unackedFragsIt = m_unackedFrags.begin();
69  auto sendTime = time::steady_clock::now();
70 
71  auto netPkt = make_shared<NetPkt>(std::move(pkt), isInterest);
72  netPkt->unackedFrags.reserve(frags.size());
73 
74  for (lp::Packet& frag : frags) {
75  // Assign TxSequence number
76  lp::Sequence txSeq = assignTxSequence(frag);
77 
78  // Store LpPacket for future retransmissions
79  unackedFragsIt = m_unackedFrags.emplace_hint(unackedFragsIt,
80  std::piecewise_construct,
81  std::forward_as_tuple(txSeq),
82  std::forward_as_tuple(frag));
83  unackedFragsIt->second.sendTime = sendTime;
84  unackedFragsIt->second.rtoTimer =
85  scheduler::schedule(m_rto.computeRto(), bind(&LpReliability::onLpPacketLost, this, txSeq));
86  unackedFragsIt->second.netPkt = netPkt;
87 
88  if (m_unackedFrags.size() == 1) {
89  m_firstUnackedFrag = m_unackedFrags.begin();
90  }
91 
92  // Add to associated NetPkt
93  netPkt->unackedFrags.push_back(unackedFragsIt);
94  }
95 }
96 
97 void
99 {
100  BOOST_ASSERT(m_options.isEnabled);
101 
102  auto now = time::steady_clock::now();
103 
104  // Extract and parse Acks
105  for (lp::Sequence ackSeq : pkt.list<lp::AckField>()) {
106  auto fragIt = m_unackedFrags.find(ackSeq);
107  if (fragIt == m_unackedFrags.end()) {
108  // Ignore an Ack for an unknown TxSequence number
109  continue;
110  }
111  auto& frag = fragIt->second;
112 
113  // Cancel the RTO timer for the acknowledged fragment
114  frag.rtoTimer.cancel();
115 
116  if (frag.retxCount == 0) {
117  // This sequence had no retransmissions, so use it to calculate the RTO
118  m_rto.addMeasurement(time::duration_cast<RttEstimator::Duration>(now - frag.sendTime));
119  }
120 
121  // Look for frags with TxSequence numbers < ackSeq (allowing for wraparound) and consider them
122  // lost if a configurable number of Acks containing greater TxSequence numbers have been
123  // received.
124  auto lostLpPackets = findLostLpPackets(fragIt);
125 
126  // Remove the fragment from the map of unacknowledged fragments and from its associated network
127  // packet. Potentially increment the start of the window.
128  onLpPacketAcknowledged(fragIt);
129 
130  // This set contains TxSequences that have been removed by onLpPacketLost below because they
131  // were part of a network packet that was removed due to a fragment exceeding retx, as well as
132  // any other TxSequences removed by onLpPacketLost. This prevents onLpPacketLost from being
133  // called later for an invalid iterator.
134  std::set<lp::Sequence> removedLpPackets;
135 
136  // Resend or fail fragments considered lost. Potentially increment the start of the window.
137  for (lp::Sequence txSeq : lostLpPackets) {
138  if (removedLpPackets.find(txSeq) == removedLpPackets.end()) {
139  auto removedThisTxSeq = this->onLpPacketLost(txSeq);
140  for (auto removedTxSeq : removedThisTxSeq) {
141  removedLpPackets.insert(removedTxSeq);
142  }
143  }
144  }
145  }
146 
147  // If packet has Fragment and TxSequence fields, extract TxSequence and add to AckQueue
148  if (pkt.has<lp::FragmentField>() && pkt.has<lp::TxSequenceField>()) {
149  m_ackQueue.push(pkt.get<lp::TxSequenceField>());
150  if (!m_isIdleAckTimerRunning) {
151  this->startIdleAckTimer();
152  }
153  }
154 }
155 
156 void
158 {
159  BOOST_ASSERT(m_options.isEnabled);
160  BOOST_ASSERT(pkt.wireEncode().type() == lp::tlv::LpPacket);
161 
162  // up to 2 extra octets reserved for potential TLV-LENGTH size increases
163  ssize_t pktSize = pkt.wireEncode().size();
164  ssize_t reservedSpace = tlv::sizeOfVarNumber(ndn::MAX_NDN_PACKET_SIZE) -
165  tlv::sizeOfVarNumber(pktSize);
166  ssize_t remainingSpace = (mtu == MTU_UNLIMITED ? ndn::MAX_NDN_PACKET_SIZE : mtu) - reservedSpace;
167  remainingSpace -= pktSize;
168 
169  while (!m_ackQueue.empty()) {
170  lp::Sequence ackSeq = m_ackQueue.front();
171  // Ack size = Ack TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + uint64_t (8 octets)
172  const ssize_t ackSize = tlv::sizeOfVarNumber(lp::tlv::Ack) +
174  sizeof(lp::Sequence);
175 
176  if (ackSize > remainingSpace) {
177  break;
178  }
179 
180  pkt.add<lp::AckField>(ackSeq);
181  m_ackQueue.pop();
182  remainingSpace -= ackSize;
183  }
184 }
185 
187 LpReliability::assignTxSequence(lp::Packet& frag)
188 {
189  lp::Sequence txSeq = ++m_lastTxSeqNo;
190  frag.set<lp::TxSequenceField>(txSeq);
191  if (m_unackedFrags.size() > 0 && m_lastTxSeqNo == m_firstUnackedFrag->first) {
192  BOOST_THROW_EXCEPTION(std::length_error("TxSequence range exceeded"));
193  }
194  return m_lastTxSeqNo;
195 }
196 
197 void
198 LpReliability::startIdleAckTimer()
199 {
200  BOOST_ASSERT(!m_isIdleAckTimerRunning);
201  m_isIdleAckTimerRunning = true;
202 
203  m_idleAckTimer = scheduler::schedule(m_options.idleAckTimerPeriod, [this] {
204  while (!m_ackQueue.empty()) {
205  m_linkService->requestIdlePacket();
206  }
207 
208  m_isIdleAckTimerRunning = false;
209  });
210 }
211 
212 void
213 LpReliability::stopIdleAckTimer()
214 {
215  m_idleAckTimer.cancel();
216  m_isIdleAckTimerRunning = false;
217 }
218 
219 std::vector<lp::Sequence>
220 LpReliability::findLostLpPackets(LpReliability::UnackedFrags::iterator ackIt)
221 {
222  std::vector<lp::Sequence> lostLpPackets;
223 
224  for (auto it = m_firstUnackedFrag; ; ++it) {
225  if (it == m_unackedFrags.end()) {
226  it = m_unackedFrags.begin();
227  }
228 
229  if (it->first == ackIt->first) {
230  break;
231  }
232 
233  auto& unackedFrag = it->second;
234  unackedFrag.nGreaterSeqAcks++;
235 
236  if (unackedFrag.nGreaterSeqAcks >= m_options.seqNumLossThreshold) {
237  lostLpPackets.push_back(it->first);
238  }
239  }
240 
241  return lostLpPackets;
242 }
243 
244 std::vector<lp::Sequence>
245 LpReliability::onLpPacketLost(lp::Sequence txSeq)
246 {
247  BOOST_ASSERT(m_unackedFrags.count(txSeq) > 0);
248  auto txSeqIt = m_unackedFrags.find(txSeq);
249 
250  auto& txFrag = txSeqIt->second;
251  txFrag.rtoTimer.cancel();
252  auto netPkt = txFrag.netPkt;
253  std::vector<lp::Sequence> removedThisTxSeq;
254 
255  // Check if maximum number of retransmissions exceeded
256  if (txFrag.retxCount >= m_options.maxRetx) {
257  // Delete all LpPackets of NetPkt from m_unackedFrags (except this one)
258  for (size_t i = 0; i < netPkt->unackedFrags.size(); i++) {
259  if (netPkt->unackedFrags[i] != txSeqIt) {
260  removedThisTxSeq.push_back(netPkt->unackedFrags[i]->first);
261  deleteUnackedFrag(netPkt->unackedFrags[i]);
262  }
263  }
264 
265  ++m_linkService->nRetxExhausted;
266 
267  // Notify strategy of dropped Interest (if any)
268  if (netPkt->isInterest) {
269  BOOST_ASSERT(netPkt->pkt.has<lp::FragmentField>());
270  ndn::Buffer::const_iterator fragBegin, fragEnd;
271  std::tie(fragBegin, fragEnd) = netPkt->pkt.get<lp::FragmentField>();
272  Block frag(&*fragBegin, std::distance(fragBegin, fragEnd));
273  onDroppedInterest(Interest(frag));
274  }
275 
276  removedThisTxSeq.push_back(txSeqIt->first);
277  deleteUnackedFrag(txSeqIt);
278  }
279  else {
280  // Assign new TxSequence
281  lp::Sequence newTxSeq = assignTxSequence(txFrag.pkt);
282  netPkt->didRetx = true;
283 
284  // Move fragment to new TxSequence mapping
285  auto newTxFragIt = m_unackedFrags.emplace_hint(
286  m_firstUnackedFrag != m_unackedFrags.end() && m_firstUnackedFrag->first > newTxSeq
287  ? m_firstUnackedFrag
288  : m_unackedFrags.end(),
289  std::piecewise_construct,
290  std::forward_as_tuple(newTxSeq),
291  std::forward_as_tuple(txFrag.pkt));
292  auto& newTxFrag = newTxFragIt->second;
293  newTxFrag.retxCount = txFrag.retxCount + 1;
294  newTxFrag.netPkt = netPkt;
295 
296  // Update associated NetPkt
297  auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), txSeqIt);
298  BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
299  *fragInNetPkt = newTxFragIt;
300 
301  removedThisTxSeq.push_back(txSeqIt->first);
302  deleteUnackedFrag(txSeqIt);
303 
304  // Retransmit fragment
305  m_linkService->sendLpPacket(lp::Packet(newTxFrag.pkt));
306 
307  // Start RTO timer for this sequence
308  newTxFrag.rtoTimer = scheduler::schedule(m_rto.computeRto(),
309  bind(&LpReliability::onLpPacketLost, this, newTxSeq));
310  }
311 
312  return removedThisTxSeq;
313 }
314 
315 void
316 LpReliability::onLpPacketAcknowledged(UnackedFrags::iterator fragIt)
317 {
318  auto netPkt = fragIt->second.netPkt;
319 
320  // Remove from NetPkt unacked fragment list
321  auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), fragIt);
322  BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
323  *fragInNetPkt = netPkt->unackedFrags.back();
324  netPkt->unackedFrags.pop_back();
325 
326  // Check if network-layer packet completely received. If so, increment counters
327  if (netPkt->unackedFrags.empty()) {
328  if (netPkt->didRetx) {
329  ++m_linkService->nRetransmitted;
330  }
331  else {
332  ++m_linkService->nAcknowledged;
333  }
334  }
335 
336  deleteUnackedFrag(fragIt);
337 }
338 
339 void
340 LpReliability::deleteUnackedFrag(UnackedFrags::iterator fragIt)
341 {
342  lp::Sequence firstUnackedTxSeq = m_firstUnackedFrag->first;
343  lp::Sequence currentTxSeq = fragIt->first;
344  auto nextFragIt = m_unackedFrags.erase(fragIt);
345 
346  if (!m_unackedFrags.empty() && firstUnackedTxSeq == currentTxSeq) {
347  // If "first" fragment in send window (allowing for wraparound), increment window begin
348  if (nextFragIt == m_unackedFrags.end()) {
349  m_firstUnackedFrag = m_unackedFrags.begin();
350  }
351  else {
352  m_firstUnackedFrag = nextFragIt;
353  }
354  }
355  else if (m_unackedFrags.empty()) {
356  m_firstUnackedFrag = m_unackedFrags.end();
357  }
358 }
359 
360 LpReliability::UnackedFrag::UnackedFrag(lp::Packet pkt)
361  : pkt(std::move(pkt))
362  , sendTime(time::steady_clock::now())
363  , retxCount(0)
364  , nGreaterSeqAcks(0)
365 {
366 }
367 
368 LpReliability::NetPkt::NetPkt(lp::Packet&& pkt, bool isInterest)
369  : pkt(std::move(pkt))
370  , isInterest(isInterest)
371  , didRetx(false)
372 {
373 }
374 
375 } // namespace face
376 } // namespace nfd
Duration computeRto() const
std::vector< typename FIELD::ValueType > list() const
Definition: packet.hpp:121
void setOptions(const Options &options)
set options for reliability
GenericLinkService is a LinkService that implements the NDNLPv2 protocol.
void processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue ...
Packet & set(const typename FIELD::ValueType &value)
remove all occurrences of FIELD, and add a FIELD with value
Definition: packet.hpp:141
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
Definition: transport.hpp:96
static time_point now() noexcept
Definition: time.cpp:80
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
Packet & add(const typename FIELD::ValueType &value)
add a FIELD with value
Definition: packet.hpp:153
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission ...
STL namespace.
bool has() const
Definition: packet.hpp:78
bool isEnabled
enables link-layer reliability
uint64_t Sequence
represents a sequence number
Definition: sequence.hpp:35
Declare a field.
Definition: field-decl.hpp:178
Table::const_iterator iterator
Definition: cs-internal.hpp:41
void addMeasurement(Duration measure)
size_t size() const
Get size of encoded wire, including Type-Length-Value.
Definition: block.cpp:301
FIELD::ValueType get(size_t index=0) const
Definition: packet.hpp:101
Copyright (c) 2011-2015 Regents of the University of California.
Definition: ndn-common.hpp:40
const GenericLinkService * getLinkService() const
LpReliability(const Options &options, GenericLinkService *linkService)
EventId schedule(time::nanoseconds after, const EventCallback &event)
Schedule an event.
Definition: scheduler.cpp:48
Block wireEncode() const
encode packet into wire format
Definition: packet.cpp:131
time::nanoseconds idleAckTimerPeriod
period between sending pending Acks in an IDLE packet
constexpr size_t sizeOfVarNumber(uint64_t number)
Get number of bytes necessary to hold value of VAR-NUMBER.
Definition: tlv.hpp:438
uint32_t type() const
Get TLV-TYPE.
Definition: block.hpp:235
const size_t MAX_NDN_PACKET_SIZE
practical limit of network layer packet size
Definition: tlv.hpp:39