38 , m_linkService(linkService)
39 , m_firstUnackedFrag(m_unackedFrags.begin())
43 BOOST_ASSERT(m_options.idleAckTimerPeriod > 0_ns);
51 if (m_options.isEnabled && !options.
isEnabled) {
67 BOOST_ASSERT(m_options.isEnabled);
73 netPkt->unackedFrags.reserve(frags.size());
84 std::piecewise_construct,
85 std::forward_as_tuple(txSeq),
86 std::forward_as_tuple(frag));
87 unackedFragsIt->second.sendTime =
sendTime;
91 time::duration_cast<time::milliseconds>(rto).count() <<
"ms");
92 unackedFragsIt->second.rtoTimer =
getScheduler().schedule(rto, [=] {
95 unackedFragsIt->second.netPkt =
netPkt;
102 netPkt->unackedFrags.push_back(unackedFragsIt);
109 BOOST_ASSERT(m_options.isEnabled);
111 bool isDuplicate =
false;
122 auto& frag = fragIt->second;
125 frag.rtoTimer.cancel();
127 if (frag.retxCount == 0) {
129 ackTxSeq <<
", retx=0, rtt=" <<
130 time::duration_cast<time::milliseconds>(now - frag.sendTime).count() <<
"ms");
136 ackTxSeq <<
", retx=" << frag.retxCount);
152 std::set<lp::Sequence> removedLpPackets;
156 if (removedLpPackets.find(txSeq) == removedLpPackets.end()) {
158 for (
auto removedTxSeq : removedTxSeqs) {
159 removedLpPackets.insert(removedTxSeq);
195 BOOST_ASSERT(m_options.isEnabled);
203 remainingSpace -= pktSize;
212 if (ackSize > remainingSpace) {
220 remainingSpace -= ackSize;
225 LpReliability::assignTxSequence(
lp::Packet& frag)
230 NDN_THROW(std::length_error(
"TxSequence range exceeded"));
244 while (!m_ackQueue.empty()) {
245 m_linkService->requestIdlePacket();
250 std::vector<lp::Sequence>
253 std::vector<lp::Sequence> lostLpPackets;
260 if (it->first == ackIt->first) {
264 auto& unackedFrag = it->second;
265 unackedFrag.nGreaterSeqAcks++;
267 ", before count=" << unackedFrag.nGreaterSeqAcks);
269 if (unackedFrag.nGreaterSeqAcks >= m_options.seqNumLossThreshold) {
270 lostLpPackets.push_back(it->first);
274 return lostLpPackets;
277 std::vector<lp::Sequence>
283 auto& txFrag = txSeqIt->second;
284 txFrag.rtoTimer.cancel();
285 auto netPkt = txFrag.netPkt;
286 std::vector<lp::Sequence> removedThisTxSeq;
294 " considered lost from acks for more recent txseqs");
298 if (txFrag.retxCount >= m_options.maxRetx) {
301 for (
size_t i = 0; i <
netPkt->unackedFrags.size(); i++) {
302 if (
netPkt->unackedFrags[i] != txSeqIt) {
303 removedThisTxSeq.push_back(
netPkt->unackedFrags[i]->first);
318 removedThisTxSeq.push_back(txSeqIt->first);
331 std::piecewise_construct,
332 std::forward_as_tuple(newTxSeq),
333 std::forward_as_tuple(txFrag.pkt));
334 auto& newTxFrag = newTxFragIt->second;
335 newTxFrag.retxCount = txFrag.retxCount + 1;
336 newTxFrag.netPkt =
netPkt;
339 auto fragInNetPkt = std::find(
netPkt->unackedFrags.begin(),
netPkt->unackedFrags.end(), txSeqIt);
340 BOOST_ASSERT(fragInNetPkt !=
netPkt->unackedFrags.end());
341 *fragInNetPkt = newTxFragIt;
343 removedThisTxSeq.push_back(txSeqIt->first);
350 NFD_LOG_FACE_TRACE(
"retransmitting seq=" << seq <<
", txseq=" << newTxSeq <<
", retx=" <<
351 txFrag.retxCount <<
", rto=" <<
352 time::duration_cast<time::milliseconds>(rto).count() <<
"ms");
360 return removedThisTxSeq;
366 auto netPkt = fragIt->second.netPkt;
369 auto fragInNetPkt = std::find(
netPkt->unackedFrags.begin(),
netPkt->unackedFrags.end(), fragIt);
370 BOOST_ASSERT(fragInNetPkt !=
netPkt->unackedFrags.end());
371 *fragInNetPkt =
netPkt->unackedFrags.back();
372 netPkt->unackedFrags.pop_back();
375 if (
netPkt->unackedFrags.empty()) {
394 if (!
m_unackedFrags.empty() && firstUnackedTxSeq == currentTxSeq) {
418 , isInterest(isInterest)
424 operator<<(std::ostream& os, const FaceLogHelper<LpReliability>& flh)
426 if (flh.obj.getLinkService() ==
nullptr) {
427 os <<
"[id=0,local=unknown,remote=unknown] ";
430 os << FaceLogHelper<LinkService>(*flh.obj.getLinkService());
NDN_CXX_NODISCARD bool has() const
ndn::util::RttEstimator m_rttEst
void setOptions(const Options &options)
set options for reliability
time::nanoseconds getEstimatedRto() const
Returns the estimated RTO value.
#define NFD_LOG_INIT(name)
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
Packet & set(const typename FIELD::ValueType &value)
remove all occurrences of FIELD, and add a FIELD with value
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
static time_point now() noexcept
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
Packet & add(const typename FIELD::ValueType &value)
add a FIELD with value
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission ...
void addMeasurement(time::nanoseconds rtt, size_t nExpectedSamples=1)
Records a new RTT measurement.
std::vector< lp::Sequence > onLpPacketLost(lp::Sequence txSeq, bool isTimeout)
resend (or give up on) a lost fragment
UnackedFrags m_unackedFrags
void startIdleAckTimer()
start the idle Ack timer
Represents a TLV element of the NDN packet format.
lp::Sequence m_lastTxSeqNo
std::vector< lp::Sequence > findLostLpPackets(UnackedFrags::iterator ackIt)
find and mark as lost fragments where a configurable number of Acks (m_options.seqNumLossThreshold) h...
UnackedFrags::iterator m_firstUnackedFrag
An iterator that points to the first unacknowledged fragment in the current window.
bool isEnabled
enables link-layer reliability
uint64_t Sequence
represents a sequence number
std::queue< lp::Sequence > m_ackQueue
Scheduler & getScheduler()
Returns the global Scheduler instance for the calling thread.
std::map< lp::Sequence, time::steady_clock::TimePoint > m_recentRecvSeqs
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
scheduler::ScopedEventId m_idleAckTimer
size_t size() const
Return the size of the encoded wire, i.e., of the whole TLV.
FIELD::ValueType get(size_t index=0) const
constexpr size_t sizeOfVarNumber(uint64_t number) noexcept
Get the number of bytes necessary to hold the value of number encoded as VAR-NUMBER.
Copyright (c) 2011-2015 Regents of the University of California.
const GenericLinkService * getLinkService() const
shared_ptr< NetPkt > netPkt
LpReliability(const Options &options, GenericLinkService *linkService)
void deleteUnackedFrag(UnackedFrags::iterator fragIt)
delete a fragment from UnackedFrags and advance acknowledge window if necessary
time::steady_clock::TimePoint sendTime
uint32_t type() const noexcept
Return the TLV-TYPE of the Block.
NetPkt(lp::Packet &&pkt, bool isInterest)
bool processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue ...
void cancel()
Cancel the operation.
Block wireEncode() const
encode packet into wire format
signal::Signal< LpReliability, Interest > onDroppedInterest
signals on Interest dropped by reliability system for exceeding allowed number of retx ...
GenericLinkService * m_linkService
std::queue< lp::Sequence > m_recentRecvSeqsQueue
NDN_CXX_NODISCARD std::vector< typename FIELD::ValueType > list() const
time::nanoseconds idleAckTimerPeriod
period between sending pending Acks in an IDLE packet
void onLpPacketAcknowledged(UnackedFrags::iterator fragIt)
remove the fragment with the given sequence number from the map of unacknowledged fragments...
provides for reliable sending and receiving of link-layer packets
const size_t MAX_NDN_PACKET_SIZE
Practical size limit of a network-layer packet.
size_t nGreaterSeqAcks
number of Acks received for sequences greater than this fragment