35 , m_linkService(linkService)
36 , m_firstUnackedFrag(m_unackedFrags.begin())
38 , m_isIdleAckTimerRunning(false)
40 BOOST_ASSERT(m_linkService !=
nullptr);
51 this->stopIdleAckTimer();
68 auto unackedFragsIt = m_unackedFrags.begin();
71 auto netPkt = make_shared<NetPkt>(std::move(pkt), isInterest);
72 netPkt->unackedFrags.reserve(frags.size());
79 unackedFragsIt = m_unackedFrags.emplace_hint(unackedFragsIt,
80 std::piecewise_construct,
81 std::forward_as_tuple(txSeq),
82 std::forward_as_tuple(frag));
83 unackedFragsIt->second.sendTime = sendTime;
85 unackedFragsIt->second.netPkt = netPkt;
87 if (m_unackedFrags.size() == 1) {
88 m_firstUnackedFrag = m_unackedFrags.begin();
92 netPkt->unackedFrags.push_back(unackedFragsIt);
105 auto fragIt = m_unackedFrags.find(ackSeq);
106 if (fragIt == m_unackedFrags.end()) {
110 auto& frag = fragIt->second;
113 frag.rtoTimer.cancel();
115 if (frag.retxCount == 0) {
117 m_rto.
addMeasurement(time::duration_cast<RttEstimator::Duration>(now - frag.sendTime));
123 auto lostLpPackets = findLostLpPackets(fragIt);
127 onLpPacketAcknowledged(fragIt);
133 std::set<lp::Sequence> removedLpPackets;
137 if (removedLpPackets.find(txSeq) == removedLpPackets.end()) {
138 auto removedThisTxSeq = this->onLpPacketLost(txSeq);
139 for (
auto removedTxSeq : removedThisTxSeq) {
140 removedLpPackets.insert(removedTxSeq);
149 if (!m_isIdleAckTimerRunning) {
150 this->startIdleAckTimer();
166 remainingSpace -= pktSize;
168 while (!m_ackQueue.empty()) {
175 if (ackSize > remainingSpace) {
181 remainingSpace -= ackSize;
186 LpReliability::assignTxSequence(
lp::Packet& frag)
190 if (m_unackedFrags.size() > 0 && m_lastTxSeqNo == m_firstUnackedFrag->first) {
191 BOOST_THROW_EXCEPTION(std::length_error(
"TxSequence range exceeded"));
193 return m_lastTxSeqNo;
197 LpReliability::startIdleAckTimer()
199 BOOST_ASSERT(!m_isIdleAckTimerRunning);
200 m_isIdleAckTimerRunning =
true;
203 while (!m_ackQueue.empty()) {
204 m_linkService->requestIdlePacket();
207 m_isIdleAckTimerRunning =
false;
212 LpReliability::stopIdleAckTimer()
215 m_isIdleAckTimerRunning =
false;
218 std::vector<lp::Sequence>
221 std::vector<lp::Sequence> lostLpPackets;
223 for (
auto it = m_firstUnackedFrag; ; ++it) {
224 if (it == m_unackedFrags.end()) {
225 it = m_unackedFrags.begin();
228 if (it->first == ackIt->first) {
232 auto& unackedFrag = it->second;
233 unackedFrag.nGreaterSeqAcks++;
235 if (unackedFrag.nGreaterSeqAcks >= m_options.seqNumLossThreshold) {
236 lostLpPackets.push_back(it->first);
240 return lostLpPackets;
243 std::vector<lp::Sequence>
246 BOOST_ASSERT(m_unackedFrags.count(txSeq) > 0);
247 auto txSeqIt = m_unackedFrags.find(txSeq);
249 auto& txFrag = txSeqIt->second;
250 txFrag.rtoTimer.cancel();
251 auto netPkt = txFrag.netPkt;
252 std::vector<lp::Sequence> removedThisTxSeq;
255 if (txFrag.retxCount >= m_options.maxRetx) {
257 for (
size_t i = 0; i < netPkt->unackedFrags.size(); i++) {
258 if (netPkt->unackedFrags[i] != txSeqIt) {
259 removedThisTxSeq.push_back(netPkt->unackedFrags[i]->first);
260 deleteUnackedFrag(netPkt->unackedFrags[i]);
264 ++m_linkService->nRetxExhausted;
267 if (netPkt->isInterest) {
269 ndn::Buffer::const_iterator fragBegin, fragEnd;
271 Block frag(&*fragBegin, std::distance(fragBegin, fragEnd));
275 removedThisTxSeq.push_back(txSeqIt->first);
276 deleteUnackedFrag(txSeqIt);
281 netPkt->didRetx =
true;
284 auto newTxFragIt = m_unackedFrags.emplace_hint(
285 m_firstUnackedFrag != m_unackedFrags.end() && m_firstUnackedFrag->first > newTxSeq
287 : m_unackedFrags.end(),
288 std::piecewise_construct,
289 std::forward_as_tuple(newTxSeq),
290 std::forward_as_tuple(txFrag.pkt));
291 auto& newTxFrag = newTxFragIt->second;
292 newTxFrag.retxCount = txFrag.retxCount + 1;
293 newTxFrag.netPkt = netPkt;
296 auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), txSeqIt);
297 BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
298 *fragInNetPkt = newTxFragIt;
300 removedThisTxSeq.push_back(txSeqIt->first);
301 deleteUnackedFrag(txSeqIt);
304 m_linkService->sendLpPacket(
lp::Packet(newTxFrag.pkt));
307 newTxFrag.rtoTimer =
scheduler::schedule(m_rto.computeRto(), [=] { onLpPacketLost(newTxSeq); });
310 return removedThisTxSeq;
316 auto netPkt = fragIt->second.netPkt;
319 auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), fragIt);
320 BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
321 *fragInNetPkt = netPkt->unackedFrags.back();
322 netPkt->unackedFrags.pop_back();
325 if (netPkt->unackedFrags.empty()) {
326 if (netPkt->didRetx) {
327 ++m_linkService->nRetransmitted;
330 ++m_linkService->nAcknowledged;
334 deleteUnackedFrag(fragIt);
340 lp::Sequence firstUnackedTxSeq = m_firstUnackedFrag->first;
342 auto nextFragIt = m_unackedFrags.erase(fragIt);
344 if (!m_unackedFrags.empty() && firstUnackedTxSeq == currentTxSeq) {
346 if (nextFragIt == m_unackedFrags.end()) {
347 m_firstUnackedFrag = m_unackedFrags.begin();
350 m_firstUnackedFrag = nextFragIt;
353 else if (m_unackedFrags.empty()) {
354 m_firstUnackedFrag = m_unackedFrags.end();
358 LpReliability::UnackedFrag::UnackedFrag(
lp::Packet pkt)
359 : pkt(std::move(pkt))
360 , sendTime(
time::steady_clock::now())
366 LpReliability::NetPkt::NetPkt(
lp::Packet&& pkt,
bool isInterest)
367 : pkt(std::move(pkt))
368 , isInterest(isInterest)
Duration computeRto() const
std::vector< typename FIELD::ValueType > list() const
void setOptions(const Options &options)
set options for reliability
GenericLinkService is a LinkService that implements the NDNLPv2 protocol.
void processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue
Packet & set(const typename FIELD::ValueType &value)
remove all occurrences of FIELD, and add a FIELD with value
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
static time_point now() noexcept
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
Packet & add(const typename FIELD::ValueType &value)
add a FIELD with value
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission
bool isEnabled
enables link-layer reliability
uint64_t Sequence
represents a sequence number
Table::const_iterator iterator
void addMeasurement(Duration measure)
size_t size() const
Get size of encoded wire, including Type-Length-Value.
FIELD::ValueType get(size_t index=0) const
constexpr size_t sizeOfVarNumber(uint64_t number) noexcept
Get the number of bytes necessary to hold the value of number encoded as VAR-NUMBER.
Copyright (c) 2011-2015 Regents of the University of California.
const GenericLinkService * getLinkService() const
LpReliability(const Options &options, GenericLinkService *linkService)
EventId schedule(time::nanoseconds after, const EventCallback &event)
Schedule an event.
Block wireEncode() const
encode packet into wire format
time::nanoseconds idleAckTimerPeriod
period between sending pending Acks in an IDLE packet
void cancel() const
Cancel the operation.
uint32_t type() const
Get TLV-TYPE.
const size_t MAX_NDN_PACKET_SIZE
practical limit of network layer packet size