44 , m_fragmenter(m_options.fragmenterOptions, this)
45 , m_reassembler(m_options.reassemblerOptions, this)
46 , m_reliability(m_options.reliabilityOptions, this)
48 , m_nextMarkTime(
time::steady_clock::time_point::max())
49 , m_nMarkedSinceInMarkingState(0)
51 m_reassembler.beforeTimeout.connect([
this] (
auto&&...) { ++nReassemblyTimeouts; });
52 m_reliability.onDroppedInterest.connect([
this] (
const auto& i) { notifyDroppedInterest(i); });
53 nReassembling.observe(&m_reassembler);
57 GenericLinkService::setOptions(
const GenericLinkService::Options& options)
60 m_fragmenter.setOptions(m_options.fragmenterOptions);
61 m_reassembler.setOptions(m_options.reassemblerOptions);
62 m_reliability.setOptions(m_options.reliabilityOptions);
66 GenericLinkService::getEffectiveMtu()
const 69 return std::min(m_options.overrideMtu, getTransport()->getMtu());
73 GenericLinkService::canOverrideMtuTo(ssize_t mtu)
const 85 GenericLinkService::requestIdlePacket()
90 this->sendLpPacket({});
94 GenericLinkService::sendLpPacket(
lp::Packet&& pkt)
96 const ssize_t mtu = getEffectiveMtu();
98 if (m_options.reliabilityOptions.isEnabled) {
99 m_reliability.piggyback(pkt, mtu);
102 if (m_options.allowCongestionMarking) {
103 checkCongestionLevel(pkt);
106 auto block = pkt.wireEncode();
107 if (mtu !=
MTU_UNLIMITED && block.size() >
static_cast<size_t>(mtu)) {
112 this->sendPacket(block);
116 GenericLinkService::doSendInterest(
const Interest& interest)
120 encodeLpFields(interest, lpPacket);
122 this->sendNetPacket(
std::move(lpPacket),
true);
126 GenericLinkService::doSendData(
const Data& data)
130 encodeLpFields(data, lpPacket);
132 this->sendNetPacket(
std::move(lpPacket),
false);
136 GenericLinkService::doSendNack(
const lp::Nack& nack)
141 encodeLpFields(nack, lpPacket);
143 this->sendNetPacket(
std::move(lpPacket),
false);
147 GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
149 std::for_each(pkts.begin(), pkts.end(), [
this] (
lp::Packet& pkt) {
157 if (m_options.allowLocalFields) {
159 if (incomingFaceIdTag !=
nullptr) {
165 if (congestionMarkTag !=
nullptr) {
169 if (m_options.allowSelfLearning) {
171 if (nonDiscoveryTag !=
nullptr) {
176 if (prefixAnnouncementTag !=
nullptr) {
182 if (pitToken !=
nullptr) {
187 if (hopCountTag !=
nullptr) {
194 if (m_options.enableGeoTags) {
195 auto geoTag = m_options.enableGeoTags();
196 if (geoTag !=
nullptr) {
203 GenericLinkService::sendNetPacket(
lp::Packet&& pkt,
bool isInterest)
205 std::vector<lp::Packet> frags;
206 ssize_t mtu = getEffectiveMtu();
209 if (m_options.reliabilityOptions.isEnabled && mtu !=
MTU_UNLIMITED) {
213 if (m_options.allowCongestionMarking && mtu !=
MTU_UNLIMITED) {
222 std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
225 ++nFragmentationErrors;
230 if (m_options.reliabilityOptions.isEnabled) {
231 frags.push_back(pkt);
238 if (frags.size() == 1) {
246 if (m_options.reliabilityOptions.isEnabled || frags.size() > 1) {
248 this->assignSequences(frags);
251 if (m_options.reliabilityOptions.isEnabled && frags.front().has<
lp::FragmentField>()) {
252 m_reliability.handleOutgoing(frags,
std::move(pkt), isInterest);
261 GenericLinkService::checkCongestionLevel(
lp::Packet& pkt)
263 ssize_t sendQueueLength = getTransport()->getSendQueueLength();
265 if (sendQueueLength < 0) {
269 if (sendQueueLength > 0) {
271 m_options.defaultCongestionThreshold <<
" capacity=" <<
272 getTransport()->getSendQueueCapacity());
276 if (static_cast<size_t>(sendQueueLength) > m_options.defaultCongestionThreshold) {
279 if (m_nextMarkTime == time::steady_clock::time_point::max()) {
280 m_nextMarkTime = now + m_options.baseCongestionMarkingInterval;
283 else if (now >= m_nextMarkTime) {
288 ++m_nMarkedSinceInMarkingState;
292 m_options.baseCongestionMarkingInterval.count() /
293 std::sqrt(m_nMarkedSinceInMarkingState + 1)));
294 m_nextMarkTime += interval;
297 else if (m_nextMarkTime != time::steady_clock::time_point::max()) {
300 m_nextMarkTime = time::steady_clock::time_point::max();
301 m_nMarkedSinceInMarkingState = 0;
306 GenericLinkService::doReceivePacket(
const Block& packet,
const EndpointId& endpoint)
311 if (m_options.reliabilityOptions.isEnabled) {
312 if (!m_reliability.processIncomingPacket(pkt)) {
314 ++nDuplicateSequence;
325 !m_options.allowReassembly) {
330 bool isReassembled =
false;
333 std::tie(isReassembled, netPkt, firstPkt) = m_reassembler.receiveFragment(endpoint, pkt);
335 this->decodeNetPacket(netPkt, firstPkt, endpoint);
345 GenericLinkService::decodeNetPacket(
const Block& netPkt,
const lp::Packet& firstPkt,
349 switch (netPkt.
type()) {
352 this->decodeNack(netPkt, firstPkt, endpointId);
355 this->decodeInterest(netPkt, firstPkt, endpointId);
359 this->decodeData(netPkt, firstPkt, endpointId);
374 GenericLinkService::decodeInterest(
const Block& netPkt,
const lp::Packet& firstPkt,
381 auto interest = make_shared<Interest>(netPkt);
393 if (m_options.allowLocalFields) {
417 if (m_options.allowSelfLearning) {
435 this->receiveInterest(*interest, endpointId);
439 GenericLinkService::decodeData(
const Block& netPkt,
const lp::Packet& firstPkt,
445 auto data = make_shared<Data>(netPkt);
489 if (m_options.allowSelfLearning) {
493 NFD_LOG_FACE_WARN(
"received PrefixAnnouncement, but self-learning disabled: IGNORE");
497 this->receiveData(*data, endpointId);
501 GenericLinkService::decodeNack(
const Block& netPkt,
const lp::Packet& firstPkt,
542 this->receiveNack(nack, endpointId);
void setTag(shared_ptr< T > tag) const
set a tag item
NDN_CXX_NODISCARD bool has() const
shared_ptr< T > getTag() const
get a tag item
constexpr size_t sizeOfNonNegativeInteger(uint64_t integer) noexcept
Get the number of bytes necessary to hold the value of integer encoded as NonNegativeInteger.
#define NFD_LOG_INIT(name)
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
Packet & set(const typename FIELD::ValueType &value)
remove all occurrences of FIELD, and add a FIELD with value
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
static time_point now() noexcept
Packet & add(const typename FIELD::ValueType &value)
add a FIELD with value
Nack & setHeader(const NackHeader &header)
Represents a TLV element of the NDN packet format.
Represents an Interest packet.
const NackHeader & getHeader() const
uint64_t EndpointId
Identifies a remote endpoint on the link.
static constexpr size_t RESERVED_HEADER_SPACE
TxSequence TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + lp::Sequence (8 octets)
represents a Network Nack
provides a tag type for simple types
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
FIELD::ValueType get(size_t index=0) const
constexpr size_t sizeOfVarNumber(uint64_t number) noexcept
Get the number of bytes necessary to hold the value of number encoded as VAR-NUMBER.
Copyright (c) 2011-2015 Regents of the University of California.
base class to allow simple management of packet tags
size_t wireEncode(EncodingImpl< TAG > &encoder) const
Prepend wire encoding to encoder.
uint32_t type() const noexcept
Return the TLV-TYPE of the Block.
size_t wireEncode(EncodingImpl< TAG > &encoder, bool wantUnsignedPortionOnly=false) const
Prepend wire encoding to encoder.
const ssize_t MIN_MTU
Minimum MTU that may be set.
Represents a Data packet.
constexpr size_t CONGESTION_MARK_SIZE
const Interest & getInterest() const
boost::chrono::nanoseconds nanoseconds
represents an error in TLV encoding or decoding
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
represent a PIT token field