NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.0: NDN, CCN, CCNx, content centric networks
API Documentation
stream-face.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
26 #ifndef NFD_DAEMON_FACE_STREAM_FACE_HPP
27 #define NFD_DAEMON_FACE_STREAM_FACE_HPP
28 
29 #include "face.hpp"
30 #include "local-face.hpp"
31 #include "core/global-io.hpp"
32 
33 namespace nfd {
34 
35 // forward declaration
36 template<class T, class U, class V> struct StreamFaceSenderImpl;
37 
38 template<class Protocol, class FaceBase = Face>
39 class StreamFace : public FaceBase
40 {
41 public:
42  typedef Protocol protocol;
43 
44  StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
45  typename protocol::socket socket, bool isOnDemand);
46 
47  // from FaceBase
48  void
49  sendInterest(const Interest& interest) DECL_OVERRIDE;
50 
51  void
52  sendData(const Data& data) DECL_OVERRIDE;
53 
54  void
55  close() DECL_OVERRIDE;
56 
57 protected:
58  void
59  processErrorCode(const boost::system::error_code& error);
60 
61  void
62  sendFromQueue();
63 
64  void
65  handleSend(const boost::system::error_code& error,
66  size_t nBytesSent);
67 
68  void
69  handleReceive(const boost::system::error_code& error,
70  size_t nBytesReceived);
71 
72  void
74 
75  void
76  deferredClose(const shared_ptr<Face>& face);
77 
78 protected:
79  typename protocol::socket m_socket;
80 
82 
83 private:
84  uint8_t m_inputBuffer[ndn::MAX_NDN_PACKET_SIZE];
85  size_t m_inputBufferSize;
86  std::queue<Block> m_sendQueue;
87 
88  friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
89  friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
90 };
91 
92 // All inherited classes must use
93 // NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
94 
95 
103 template<class Protocol, class U>
105 {
106  static void
107  validateSocket(const typename Protocol::socket& socket)
108  {
109  }
110 };
111 
112 
113 template<class T, class FaceBase>
114 inline
115 StreamFace<T, FaceBase>::StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
116  typename StreamFace::protocol::socket socket, bool isOnDemand)
117  : FaceBase(remoteUri, localUri)
118  , m_socket(std::move(socket))
119  , m_inputBufferSize(0)
120 {
121  NFD_LOG_FACE_INFO("Creating face");
122 
125 
126  m_socket.async_receive(boost::asio::buffer(m_inputBuffer, ndn::MAX_NDN_PACKET_SIZE),
128  boost::asio::placeholders::error,
129  boost::asio::placeholders::bytes_transferred));
130 }
131 
132 
133 template<class Protocol, class FaceBase, class Packet>
135 {
136  static void
137  send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
138  {
139  bool wasQueueEmpty = face.m_sendQueue.empty();
140  face.m_sendQueue.push(packet.wireEncode());
141 
142  if (wasQueueEmpty)
143  face.sendFromQueue();
144  }
145 };
146 
147 // partial specialization (only classes can be partially specialized)
148 template<class Protocol, class Packet>
149 struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
150 {
151  static void
152  send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
153  {
154  bool wasQueueEmpty = face.m_sendQueue.empty();
155 
156  if (!face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
157  {
158  face.m_sendQueue.push(face.filterAndEncodeLocalControlHeader(packet));
159  }
160  face.m_sendQueue.push(packet.wireEncode());
161 
162  if (wasQueueEmpty)
163  face.sendFromQueue();
164  }
165 };
166 
167 
168 template<class T, class U>
169 inline void
171 {
172  NFD_LOG_FACE_TRACE(__func__);
173  this->emitSignal(onSendInterest, interest);
175 }
176 
177 template<class T, class U>
178 inline void
180 {
181  NFD_LOG_FACE_TRACE(__func__);
182  this->emitSignal(onSendData, data);
184 }
185 
186 template<class T, class U>
187 inline void
189 {
190  if (!m_socket.is_open())
191  return;
192 
193  NFD_LOG_FACE_INFO("Closing face");
194 
195  shutdownSocket();
196  this->fail("Face closed");
197 }
198 
199 template<class T, class U>
200 inline void
201 StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
202 {
203  if (error == boost::asio::error::operation_aborted || // when cancel() is called
204  error == boost::asio::error::shut_down) // after shutdown() is called
205  return;
206 
207  if (!m_socket.is_open())
208  {
209  this->fail("Connection closed");
210  return;
211  }
212 
213  if (error != boost::asio::error::eof)
214  NFD_LOG_FACE_WARN("Send or receive operation failed: " << error.message());
215 
216  shutdownSocket();
217 
218  if (error == boost::asio::error::eof)
219  this->fail("Connection closed");
220  else
221  this->fail(error.message());
222 }
223 
224 template<class T, class U>
225 inline void
227 {
228  boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
229  bind(&StreamFace<T, U>::handleSend, this,
230  boost::asio::placeholders::error,
231  boost::asio::placeholders::bytes_transferred));
232 }
233 
234 template<class T, class U>
235 inline void
236 StreamFace<T, U>::handleSend(const boost::system::error_code& error,
237  size_t nBytesSent)
238 {
239  if (error)
240  return processErrorCode(error);
241 
242  BOOST_ASSERT(!m_sendQueue.empty());
243 
244  NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
245  this->getMutableCounters().getNOutBytes() += nBytesSent;
246 
247  m_sendQueue.pop();
248  if (!m_sendQueue.empty())
249  sendFromQueue();
250 }
251 
252 template<class T, class U>
253 inline void
254 StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
255  size_t nBytesReceived)
256 {
257  if (error)
258  return processErrorCode(error);
259 
260  NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
261  this->getMutableCounters().getNInBytes() += nBytesReceived;
262 
263  m_inputBufferSize += nBytesReceived;
264 
265  size_t offset = 0;
266 
267  bool isOk = true;
268  Block element;
269  while (m_inputBufferSize - offset > 0) {
270  std::tie(isOk, element) = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset);
271  if (!isOk)
272  break;
273 
274  offset += element.size();
275 
276  BOOST_ASSERT(offset <= m_inputBufferSize);
277 
278  if (!this->decodeAndDispatchInput(element)) {
279  NFD_LOG_FACE_WARN("Received unrecognized TLV block of type " << element.type());
280  // ignore unknown packet and proceed
281  }
282  }
283 
284  if (!isOk && m_inputBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0)
285  {
286  NFD_LOG_FACE_WARN("Failed to parse incoming packet or packet too large to process");
287  shutdownSocket();
288  this->fail("Failed to parse incoming packet or packet too large to process");
289  return;
290  }
291 
292  if (offset > 0)
293  {
294  if (offset != m_inputBufferSize)
295  {
296  std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
297  m_inputBuffer);
298  m_inputBufferSize -= offset;
299  }
300  else
301  {
302  m_inputBufferSize = 0;
303  }
304  }
305 
306  m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
307  ndn::MAX_NDN_PACKET_SIZE - m_inputBufferSize),
309  boost::asio::placeholders::error,
310  boost::asio::placeholders::bytes_transferred));
311 }
312 
313 template<class T, class U>
314 inline void
316 {
317  NFD_LOG_FACE_TRACE(__func__);
318 
319  // Cancel all outstanding operations and shutdown the socket
320  // so that no further sends or receives are possible.
321  // Use the non-throwing variants and ignore errors, if any.
322  boost::system::error_code error;
323  m_socket.cancel(error);
324  m_socket.shutdown(protocol::socket::shutdown_both, error);
325 
326  // ensure that the Face object is alive at least until all pending
327  // handlers are dispatched
329  this, this->shared_from_this()));
330 
331  // Some bug or feature of Boost.Asio (see http://redmine.named-data.net/issues/1856):
332  //
333  // When shutdownSocket is called from within a socket event (e.g., from handleReceive),
334  // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
335  // Instead, handleSend is invoked as nothing bad happened.
336  //
337  // In order to prevent the assertion in handleSend from failing, we clear the queue
338  // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
339  // this point have been executed. If more send operations are scheduled after this
340  // point, they will fail because the socket has been shutdown, and their callbacks
341  // will be invoked with error code == asio::error::shut_down.
342 }
343 
344 template<class T, class U>
345 inline void
346 StreamFace<T, U>::deferredClose(const shared_ptr<Face>& face)
347 {
348  NFD_LOG_FACE_TRACE(__func__);
349 
350  // clear send queue
351  std::queue<Block> emptyQueue;
352  std::swap(emptyQueue, m_sendQueue);
353 
354  // use the non-throwing variant and ignore errors, if any
355  boost::system::error_code error;
356  m_socket.close(error);
357 }
358 
359 } // namespace nfd
360 
361 #endif // NFD_DAEMON_FACE_STREAM_FACE_HPP
Copyright (c) 2011-2015 Regents of the University of California.
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
static std::tuple< bool, Block > fromBuffer(ConstBufferPtr buffer, size_t offset)
Try to construct block from Buffer.
Definition: block.cpp:253
Copyright (c) 2013-2015 Regents of the University of California.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
Definition: face.hpp:321
represents the underlying protocol and address used by a Face
Definition: face-uri.hpp:44
STL namespace.
Class representing a wire element of NDN-TLV packet format.
Definition: block.hpp:43
represents an Interest packet
Definition: interest.hpp:45
static void send(StreamFace< Protocol, LocalFace > &face, const Packet &packet)
void sendData(const Data &data) 1
StreamFace(const FaceUri &remoteUri, const FaceUri &localUri, typename protocol::socket socket, bool isOnDemand)
#define DECL_OVERRIDE
expands to &#39;override&#39; if compiler supports this feature, otherwise expands to nothing ...
Definition: common.hpp:49
represents a face
Definition: face.hpp:57
size_t size() const
Definition: block.cpp:504
Class allowing validation of the StreamFace use.
#define emitSignal(...)
(implementation detail)
Definition: signal-emit.hpp:76
Protocol protocol
Definition: stream-face.hpp:42
#define NFD_LOG_FACE_INFO(msg)
Log a message at INFO level.
Definition: face.hpp:327
Copyright (c) 2011-2015 Regents of the University of California.
Definition: ndn-common.hpp:38
static void validateSocket(const typename Protocol::socket &socket)
void deferredClose(const shared_ptr< Face > &face)
protocol::socket m_socket
Definition: stream-face.hpp:79
void sendInterest(const Interest &interest) 1
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
uint32_t type() const
Definition: block.hpp:346
represents a face
Definition: local-face.hpp:40
boost::asio::io_service & getGlobalIoService()
Definition: global-io.hpp:35
void processErrorCode(const boost::system::error_code &error)
represents a Data packet
Definition: data.hpp:39
static void send(StreamFace< Protocol, FaceBase > &face, const Packet &packet)
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
Definition: face.hpp:330
const size_t MAX_NDN_PACKET_SIZE
practical limit of network layer packet size
Definition: tlv.hpp:39