37 time::milliseconds interestLifetime)
41 , m_lastSequenceNum(std::numeric_limits<uint64_t>::max())
42 , m_lastNackSequenceNum(std::numeric_limits<uint64_t>::max())
44 , m_scheduler(face.getIoService())
45 , m_lastInterestId(nullptr)
46 , m_interestLifetime(interestLifetime)
59 sendInitialInterest();
69 if (m_lastInterestId !=
nullptr)
71 m_lastInterestId =
nullptr;
75 NotificationSubscriberBase::sendInitialInterest()
80 auto interest = make_shared<Interest>(m_prefix);
81 interest->setCanBePrefix(
true);
82 interest->setMustBeFresh(
true);
83 interest->setInterestLifetime(m_interestLifetime);
84 sendInterest(*interest);
88 NotificationSubscriberBase::sendNextInterest()
93 Name nextName = m_prefix;
96 auto interest = make_shared<Interest>(nextName);
97 interest->setCanBePrefix(
false);
98 interest->setInterestLifetime(m_interestLifetime);
99 sendInterest(*interest);
103 NotificationSubscriberBase::sendInterest(
const Interest& interest)
106 [
this] (
const auto&,
const auto& d) { this->afterReceiveData(d); },
107 [
this] (
const auto&,
const auto& n) { this->afterReceiveNack(n); },
108 [
this] (
const auto&) { this->afterTimeout(); });
112 NotificationSubscriberBase::shouldStop()
117 if (!hasSubscriber() &&
onNack.isEmpty()) {
125 NotificationSubscriberBase::afterReceiveData(
const Data& data)
131 m_lastSequenceNum = data.getName().get(-1).toSequenceNumber();
133 catch (
const tlv::Error&) {
135 sendInitialInterest();
139 if (!decodeAndDeliver(data)) {
141 sendInitialInterest();
149 NotificationSubscriberBase::afterReceiveNack(
const lp::Nack& nack)
156 time::milliseconds delay = exponentialBackoff(nack);
157 m_nackEvent = m_scheduler.
scheduleEvent(delay, [
this] { sendInitialInterest(); });
161 NotificationSubscriberBase::afterTimeout()
168 sendInitialInterest();
172 NotificationSubscriberBase::exponentialBackoff(
lp::Nack nack)
174 uint64_t nackSequenceNum;
178 catch (
const tlv::Error&) {
182 if (m_lastNackSequenceNum == nackSequenceNum) {
189 m_lastNackSequenceNum = nackSequenceNum;
191 return time::milliseconds(static_cast<time::milliseconds::rep>(std::pow(2, m_attempts) * 100 +
void start()
start or resume receiving notifications
Copyright (c) 2011-2015 Regents of the University of California.
uint64_t toSequenceNumber() const
Interpret as sequence number component using NDN naming conventions.
virtual ~NotificationSubscriberBase()
const Component & get(ssize_t i) const
Get the component at the given index.
EventId scheduleEvent(time::nanoseconds after, const EventCallback &callback)
Schedule a one-time event after the specified delay.
represents a Network Nack
uint32_t generateWord32()
Generate a non-cryptographically-secure random integer in the range [0, 2^32)
signal::Signal< NotificationSubscriberBase, lp::Nack > onNack
fires when a NACK is received
signal::Signal< NotificationSubscriberBase > onTimeout
fires when no Notification is received within .getInterestLifetime period
Provide a communication channel with local or remote NDN forwarder.
Name & appendSequenceNumber(uint64_t seqNo)
Append a sequence number component.
Represents an absolute name.
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
signal::Signal< NotificationSubscriberBase, Data > onDecodeError
fires when a Data packet in the Notification Stream cannot be decoded as Notification
const Interest & getInterest() const
void removePendingInterest(const PendingInterestId *pendingInterestId)
Cancel previously expressed Interest.
NotificationSubscriberBase(Face &face, const Name &prefix, time::milliseconds interestLifetime)
construct a NotificationSubscriber
const Name & getName() const
void stop()
stop receiving notifications