NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.3: NDN, CCN, CCNx, content centric networks
API Documentation
notification-subscriber.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
48 #ifndef NDN_UTIL_NOTIFICATION_SUBSCRIBER_HPP
49 #define NDN_UTIL_NOTIFICATION_SUBSCRIBER_HPP
50 
51 #include "../face.hpp"
52 #include "signal.hpp"
53 #include "concepts.hpp"
54 #include "time.hpp"
55 #include "random.hpp"
56 #include "scheduler.hpp"
58 #include <boost/concept_check.hpp>
59 
60 namespace ndn {
61 namespace util {
62 
67 template<typename Notification>
68 class NotificationSubscriber : noncopyable
69 {
70 public:
71  BOOST_CONCEPT_ASSERT((boost::DefaultConstructible<Notification>));
72  BOOST_CONCEPT_ASSERT((WireDecodable<Notification>));
73 
79  const time::milliseconds& interestLifetime = time::milliseconds(60000))
80  : m_face(face)
81  , m_prefix(prefix)
82  , m_isRunning(false)
83  , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
84  , m_lastNackSequenceNo(std::numeric_limits<uint64_t>::max())
85  , m_attempts(1)
86  , m_scheduler(face.getIoService())
87  , m_nackEvent(m_scheduler)
88  , m_interestLifetime(interestLifetime)
89  {
90  }
91 
92  virtual
94  {
95  }
96 
101  time::milliseconds
103  {
104  return m_interestLifetime;
105  }
106 
107  bool
108  isRunning() const
109  {
110  return m_isRunning;
111  }
112 
117  void
119  {
120  if (m_isRunning) // already running
121  return;
122  m_isRunning = true;
123 
124  this->sendInitialInterest();
125  }
126 
129  void
131  {
132  if (!m_isRunning) // not running
133  return;
134  m_isRunning = false;
135 
136  if (m_lastInterestId != 0)
137  m_face.removePendingInterest(m_lastInterestId);
138  m_lastInterestId = 0;
139  }
140 
141 public: // subscriptions
146 
150 
154 
158 
159 private:
160  void
161  sendInitialInterest()
162  {
163  if (this->shouldStop())
164  return;
165 
166  shared_ptr<Interest> interest = make_shared<Interest>(m_prefix);
167  interest->setMustBeFresh(true);
168  interest->setChildSelector(1);
169  interest->setInterestLifetime(getInterestLifetime());
170 
171  m_lastInterestId = m_face.expressInterest(*interest,
175  }
176 
177  void
178  sendNextInterest()
179  {
180  if (this->shouldStop())
181  return;
182 
183  BOOST_ASSERT(m_lastSequenceNo !=
184  std::numeric_limits<uint64_t>::max());// overflow or missing initial reply
185 
186  Name nextName = m_prefix;
187  nextName.appendSequenceNumber(m_lastSequenceNo + 1);
188 
189  shared_ptr<Interest> interest = make_shared<Interest>(nextName);
190  interest->setInterestLifetime(getInterestLifetime());
191 
192  m_lastInterestId = m_face.expressInterest(*interest,
196  }
197 
201  bool
202  shouldStop()
203  {
204  if (!m_isRunning)
205  return true;
206  if (onNotification.isEmpty() && onNack.isEmpty()) {
207  this->stop();
208  return true;
209  }
210  return false;
211  }
212 
213  void
214  afterReceiveData(const Data& data)
215  {
216  if (this->shouldStop())
217  return;
218 
219  Notification notification;
220  try {
221  m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
222  notification.wireDecode(data.getContent().blockFromValue());
223  }
224  catch (tlv::Error&) {
225  this->onDecodeError(data);
226  this->sendInitialInterest();
227  return;
228  }
229 
230  this->onNotification(notification);
231 
232  this->sendNextInterest();
233  }
234 
235  void
236  afterReceiveNack(const lp::Nack& nack)
237  {
238  if (this->shouldStop())
239  return;
240 
241  this->onNack(nack);
242 
243  time::milliseconds delay = exponentialBackoff(nack);
244  m_nackEvent = m_scheduler.scheduleEvent(delay, [this] {this->sendInitialInterest();});
245  }
246 
247  void
248  afterTimeout()
249  {
250  if (this->shouldStop())
251  return;
252 
253  this->onTimeout();
254 
255  this->sendInitialInterest();
256  }
257 
258  time::milliseconds
259  exponentialBackoff(lp::Nack nack)
260  {
261  uint64_t nackSequenceNo;
262 
263  try {
264  nackSequenceNo = nack.getInterest().getName().get(-1).toSequenceNumber();
265  }
266  catch (name::Component::Error&) {
267  nackSequenceNo = 0;
268  }
269 
270  if (m_lastNackSequenceNo == nackSequenceNo) {
271  ++m_attempts;
272  } else {
273  m_attempts = 1;
274  }
275 
276  time::milliseconds delayTime =
277  time::milliseconds (static_cast<uint32_t>( pow(2, m_attempts) * 100 + random::generateWord32() % 100));
278 
279  m_lastNackSequenceNo = nackSequenceNo;
280  return delayTime;
281  }
282 
283 private:
284  Face& m_face;
285  Name m_prefix;
286  bool m_isRunning;
287  uint64_t m_lastSequenceNo;
288  uint64_t m_lastNackSequenceNo;
289  uint64_t m_attempts;
290  util::scheduler::Scheduler m_scheduler;
291  util::scheduler::ScopedEventId m_nackEvent;
292  const PendingInterestId* m_lastInterestId;
293  time::milliseconds m_interestLifetime;
294 };
295 
296 } // namespace util
297 } // namespace ndn
298 
299 #endif // NDN_UTIL_NOTIFICATION_SUBSCRIBER_HPP
time::milliseconds getInterestLifetime() const
Copyright (c) 2011-2015 Regents of the University of California.
uint64_t toSequenceNumber() const
Interpret as sequence number component using NDN naming conventions.
const Block & getContent() const
Get content Block.
Definition: data.cpp:230
const Component & get(ssize_t i) const
Get the component at the given index.
Definition: name.hpp:411
NotificationSubscriber(Face &face, const Name &prefix, const time::milliseconds &interestLifetime=time::milliseconds(60000))
construct a NotificationSubscriber
const Name & getName() const
Get name of the Data packet.
Definition: data.hpp:318
STL namespace.
EventId scheduleEvent(const time::nanoseconds &after, const Event &event)
Schedule a one-time event after the specified delay.
Definition: scheduler.cpp:58
void start()
start or resume receiving notifications
provides a lightweight signal / event system
signal::Signal< NotificationSubscriber > onTimeout
fires when no Notification is received within .getInterestLifetime period
signal::Signal< NotificationSubscriber, Notification > onNotification
fires when a Notification is received
signal::Signal< NotificationSubscriber, lp::Nack > onNack
fires when a NACK is received
Name & appendSequenceNumber(uint64_t seqNo)
Append sequence number using NDN naming conventions.
Definition: name.cpp:241
represents a Network Nack
Definition: nack.hpp:40
uint32_t generateWord32()
Generate a non-cryptographically-secure random integer in the range [0, 2^32)
Definition: random.cpp:63
signal::Signal< NotificationSubscriber, Data > onDecodeError
fires when a Data packet in the Notification Stream cannot be decoded as Notification ...
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:125
provides a subscriber of Notification Stream
Name abstraction to represent an absolute name.
Definition: name.hpp:46
void stop()
stop receiving notifications
Event that is automatically cancelled upon destruction.
Block blockFromValue() const
Definition: block.cpp:437
const PendingInterestId * expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
Definition: face.cpp:132
represents a Data packet
Definition: data.hpp:37
a concept check for TLV abstraction with .wireDecode method and constructible from Block ...
Definition: concepts.hpp:70
const Interest & getInterest() const
Definition: nack.hpp:53
void removePendingInterest(const PendingInterestId *pendingInterestId)
Cancel previously expressed Interest.
Definition: face.cpp:182
represents an error in TLV encoding or decoding
Definition: tlv.hpp:50
Error that can be thrown from name::Component.
const Name & getName() const
Definition: interest.hpp:215