36 , m_linkService(linkService)
37 , m_firstUnackedFrag(m_unackedFrags.begin())
40 BOOST_ASSERT(m_linkService !=
nullptr);
67 auto unackedFragsIt = m_unackedFrags.begin();
70 auto netPkt = make_shared<NetPkt>(
std::move(pkt), isInterest);
71 netPkt->unackedFrags.reserve(frags.size());
78 unackedFragsIt = m_unackedFrags.emplace_hint(unackedFragsIt,
79 std::piecewise_construct,
80 std::forward_as_tuple(txSeq),
81 std::forward_as_tuple(frag));
82 unackedFragsIt->second.sendTime = sendTime;
84 [=] { onLpPacketLost(txSeq); });
85 unackedFragsIt->second.netPkt = netPkt;
87 if (m_unackedFrags.size() == 1) {
88 m_firstUnackedFrag = m_unackedFrags.begin();
92 netPkt->unackedFrags.push_back(unackedFragsIt);
105 auto fragIt = m_unackedFrags.find(ackSeq);
106 if (fragIt == m_unackedFrags.end()) {
110 auto& frag = fragIt->second;
113 frag.rtoTimer.cancel();
115 if (frag.retxCount == 0) {
123 auto lostLpPackets = findLostLpPackets(fragIt);
127 onLpPacketAcknowledged(fragIt);
133 std::set<lp::Sequence> removedLpPackets;
137 if (removedLpPackets.find(txSeq) == removedLpPackets.end()) {
138 auto removedThisTxSeq = onLpPacketLost(txSeq);
139 for (
auto removedTxSeq : removedThisTxSeq) {
140 removedLpPackets.insert(removedTxSeq);
164 remainingSpace -= pktSize;
166 while (!m_ackQueue.empty()) {
173 if (ackSize > remainingSpace) {
179 remainingSpace -= ackSize;
184 LpReliability::assignTxSequence(
lp::Packet& frag)
188 if (m_unackedFrags.size() > 0 && m_lastTxSeqNo == m_firstUnackedFrag->first) {
189 NDN_THROW(std::length_error(
"TxSequence range exceeded"));
191 return m_lastTxSeqNo;
195 LpReliability::startIdleAckTimer()
197 if (m_idleAckTimer) {
203 while (!m_ackQueue.empty()) {
204 m_linkService->requestIdlePacket(0);
209 std::vector<lp::Sequence>
210 LpReliability::findLostLpPackets(LpReliability::UnackedFrags::iterator ackIt)
212 std::vector<lp::Sequence> lostLpPackets;
214 for (
auto it = m_firstUnackedFrag; ; ++it) {
215 if (it == m_unackedFrags.end()) {
216 it = m_unackedFrags.begin();
219 if (it->first == ackIt->first) {
223 auto& unackedFrag = it->second;
224 unackedFrag.nGreaterSeqAcks++;
226 if (unackedFrag.nGreaterSeqAcks >= m_options.seqNumLossThreshold) {
227 lostLpPackets.push_back(it->first);
231 return lostLpPackets;
234 std::vector<lp::Sequence>
237 BOOST_ASSERT(m_unackedFrags.count(txSeq) > 0);
238 auto txSeqIt = m_unackedFrags.find(txSeq);
240 auto& txFrag = txSeqIt->second;
241 txFrag.rtoTimer.cancel();
242 auto netPkt = txFrag.netPkt;
243 std::vector<lp::Sequence> removedThisTxSeq;
246 if (txFrag.retxCount >= m_options.maxRetx) {
248 for (
size_t i = 0; i < netPkt->unackedFrags.size(); i++) {
249 if (netPkt->unackedFrags[i] != txSeqIt) {
250 removedThisTxSeq.push_back(netPkt->unackedFrags[i]->first);
251 deleteUnackedFrag(netPkt->unackedFrags[i]);
255 ++m_linkService->nRetxExhausted;
258 if (netPkt->isInterest) {
260 ndn::Buffer::const_iterator fragBegin, fragEnd;
262 Block frag(&*fragBegin, std::distance(fragBegin, fragEnd));
266 removedThisTxSeq.push_back(txSeqIt->first);
267 deleteUnackedFrag(txSeqIt);
272 netPkt->didRetx =
true;
275 auto newTxFragIt = m_unackedFrags.emplace_hint(
276 m_firstUnackedFrag != m_unackedFrags.end() && m_firstUnackedFrag->first > newTxSeq
278 : m_unackedFrags.end(),
279 std::piecewise_construct,
280 std::forward_as_tuple(newTxSeq),
281 std::forward_as_tuple(txFrag.pkt));
282 auto& newTxFrag = newTxFragIt->second;
283 newTxFrag.retxCount = txFrag.retxCount + 1;
284 newTxFrag.netPkt = netPkt;
287 auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), txSeqIt);
288 BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
289 *fragInNetPkt = newTxFragIt;
291 removedThisTxSeq.push_back(txSeqIt->first);
292 deleteUnackedFrag(txSeqIt);
295 m_linkService->sendLpPacket(
lp::Packet(newTxFrag.pkt), 0);
298 newTxFrag.rtoTimer =
getScheduler().schedule(m_rttEst.getEstimatedRto(),
299 [=] { onLpPacketLost(newTxSeq); });
302 return removedThisTxSeq;
306 LpReliability::onLpPacketAcknowledged(UnackedFrags::iterator fragIt)
308 auto netPkt = fragIt->second.netPkt;
311 auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), fragIt);
312 BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
313 *fragInNetPkt = netPkt->unackedFrags.back();
314 netPkt->unackedFrags.pop_back();
317 if (netPkt->unackedFrags.empty()) {
318 if (netPkt->didRetx) {
319 ++m_linkService->nRetransmitted;
322 ++m_linkService->nAcknowledged;
326 deleteUnackedFrag(fragIt);
330 LpReliability::deleteUnackedFrag(UnackedFrags::iterator fragIt)
332 lp::Sequence firstUnackedTxSeq = m_firstUnackedFrag->first;
334 auto nextFragIt = m_unackedFrags.erase(fragIt);
336 if (!m_unackedFrags.empty() && firstUnackedTxSeq == currentTxSeq) {
338 if (nextFragIt == m_unackedFrags.end()) {
339 m_firstUnackedFrag = m_unackedFrags.begin();
342 m_firstUnackedFrag = nextFragIt;
345 else if (m_unackedFrags.empty()) {
346 m_firstUnackedFrag = m_unackedFrags.end();
350 LpReliability::UnackedFrag::UnackedFrag(
lp::Packet pkt)
351 : pkt(std::
move(pkt))
352 , sendTime(
time::steady_clock::now())
358 LpReliability::NetPkt::NetPkt(
lp::Packet&& pkt,
bool isInterest)
359 : pkt(std::
move(pkt))
360 , isInterest(isInterest)