39 , m_lastSequenceNo(
std::numeric_limits<uint64_t>::max())
40 , m_lastNackSequenceNo(
std::numeric_limits<uint64_t>::max())
42 , m_scheduler(face.getIoService())
43 , m_nackEvent(m_scheduler)
44 , m_interestLifetime(interestLifetime)
57 this->sendInitialInterest();
67 if (m_lastInterestId != 0)
73 NotificationSubscriberBase::sendInitialInterest()
75 if (this->shouldStop())
78 auto interest = make_shared<Interest>(m_prefix);
79 interest->setMustBeFresh(
true);
80 interest->setChildSelector(1);
84 bind(&NotificationSubscriberBase::afterReceiveData,
this, _2),
85 bind(&NotificationSubscriberBase::afterReceiveNack,
this, _2),
86 bind(&NotificationSubscriberBase::afterTimeout,
this));
90 NotificationSubscriberBase::sendNextInterest()
92 if (this->shouldStop())
95 BOOST_ASSERT(m_lastSequenceNo != std::numeric_limits<uint64_t>::max());
97 Name nextName = m_prefix;
100 auto interest = make_shared<Interest>(nextName);
104 bind(&NotificationSubscriberBase::afterReceiveData,
this, _2),
105 bind(&NotificationSubscriberBase::afterReceiveNack,
this, _2),
106 bind(&NotificationSubscriberBase::afterTimeout,
this));
110 NotificationSubscriberBase::shouldStop()
114 if (!this->hasSubscriber() &&
onNack.isEmpty()) {
122 NotificationSubscriberBase::afterReceiveData(
const Data& data)
124 if (this->shouldStop())
132 this->sendInitialInterest();
136 if (!this->decodeAndDeliver(data)) {
138 this->sendInitialInterest();
142 this->sendNextInterest();
146 NotificationSubscriberBase::afterReceiveNack(
const lp::Nack& nack)
148 if (this->shouldStop())
154 m_nackEvent = m_scheduler.
scheduleEvent(delay, [
this] {this->sendInitialInterest();});
158 NotificationSubscriberBase::afterTimeout()
160 if (this->shouldStop())
165 this->sendInitialInterest();
169 NotificationSubscriberBase::exponentialBackoff(
lp::Nack nack)
171 uint64_t nackSequenceNo;
180 if (m_lastNackSequenceNo == nackSequenceNo) {
187 m_lastNackSequenceNo = nackSequenceNo;
void start()
start or resume receiving notifications
const Name & getName() const
Copyright (c) 2011-2015 Regents of the University of California.
virtual ~NotificationSubscriberBase()
time::milliseconds getInterestLifetime() const
boost::posix_time::time_duration milliseconds(long duration)
const Interest & getInterest() const
EventId scheduleEvent(const time::nanoseconds &after, const Event &event)
Schedule a one-time event after the specified delay.
represents a Network Nack
uint32_t generateWord32()
Generate a non-cryptographically-secure random integer in the range [0, 2^32)
signal::Signal< NotificationSubscriberBase, lp::Nack > onNack
fires when a NACK is received
signal::Signal< NotificationSubscriberBase > onTimeout
fires when no Notification is received within .getInterestLifetime period
Provide a communication channel with local or remote NDN forwarder.
void delay(websocketpp::connection_hdl, long duration)
Name & appendSequenceNumber(uint64_t seqNo)
Append a sequence number component.
Represents an absolute name.
const Name & getName() const
Get name.
signal::Signal< NotificationSubscriberBase, Data > onDecodeError
fires when a Data packet in the Notification Stream cannot be decoded as Notification ...
uint64_t toSequenceNumber() const
Interpret as sequence number component using NDN naming conventions.
const PendingInterestId * expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
Represents a Data packet.
const Component & get(ssize_t i) const
Get the component at the given index.
void removePendingInterest(const PendingInterestId *pendingInterestId)
Cancel previously expressed Interest.
represents an error in TLV encoding or decoding
NotificationSubscriberBase(Face &face, const Name &prefix, time::milliseconds interestLifetime)
construct a NotificationSubscriber
void stop()
stop receiving notifications