NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.3: NDN, CCN, CCNx, content centric networks
API Documentation
stream-transport.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
26 #ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27 #define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28 
29 #include "transport.hpp"
30 #include "core/global-io.hpp"
31 
32 #include <queue>
33 
34 namespace nfd {
35 namespace face {
36 
41 template<class Protocol>
42 class StreamTransport : public Transport
43 {
44 public:
45  typedef Protocol protocol;
46 
51  explicit
52  StreamTransport(typename protocol::socket&& socket);
53 
54 protected:
55  virtual void
56  doClose() override;
57 
58  void
59  deferredClose();
60 
61  virtual void
62  doSend(Transport::Packet&& packet) override;
63 
64  void
65  sendFromQueue();
66 
67  void
68  handleSend(const boost::system::error_code& error,
69  size_t nBytesSent);
70 
71  void
72  startReceive();
73 
74  void
75  handleReceive(const boost::system::error_code& error,
76  size_t nBytesReceived);
77 
78  void
79  processErrorCode(const boost::system::error_code& error);
80 
81  virtual void
82  handleError(const boost::system::error_code& error);
83 
84  void
86 
87  void
89 
90 protected:
91  typename protocol::socket m_socket;
92 
94 
95 private:
96  uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
97  size_t m_receiveBufferSize;
98  std::queue<Block> m_sendQueue;
99 };
100 
101 
102 template<class T>
103 StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
104  : m_socket(std::move(socket))
105  , m_receiveBufferSize(0)
106 {
107  startReceive();
108 }
109 
110 template<class T>
111 void
113 {
114  NFD_LOG_FACE_TRACE(__func__);
115 
116  if (m_socket.is_open()) {
117  // Cancel all outstanding operations and shutdown the socket
118  // so that no further sends or receives are possible.
119  // Use the non-throwing variants and ignore errors, if any.
120  boost::system::error_code error;
121  m_socket.cancel(error);
122  m_socket.shutdown(protocol::socket::shutdown_both, error);
123  }
124 
125  // Ensure that the Transport stays alive at least until
126  // all pending handlers are dispatched
128 
129  // Some bug or feature of Boost.Asio (see http://redmine.named-data.net/issues/1856):
130  //
131  // When doClose is called from a socket event handler (e.g., from handleReceive),
132  // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
133  // Instead, handleSend is invoked as nothing bad happened.
134  //
135  // In order to prevent the assertion in handleSend from failing, we clear the queue
136  // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
137  // this point have been executed. If more send operations are scheduled after this
138  // point, they will fail because the socket has been shutdown, and their callbacks
139  // will be invoked with error code == asio::error::shut_down.
140 }
141 
142 template<class T>
143 void
145 {
146  NFD_LOG_FACE_TRACE(__func__);
147 
148  resetSendQueue();
149 
150  // use the non-throwing variant and ignore errors, if any
151  boost::system::error_code error;
152  m_socket.close(error);
153 
154  this->setState(TransportState::CLOSED);
155 }
156 
157 template<class T>
158 void
160 {
161  NFD_LOG_FACE_TRACE(__func__);
162 
163  if (getState() != TransportState::UP)
164  return;
165 
166  bool wasQueueEmpty = m_sendQueue.empty();
167  m_sendQueue.push(packet.packet);
168 
169  if (wasQueueEmpty)
170  sendFromQueue();
171 }
172 
173 template<class T>
174 void
176 {
177  boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
178  bind(&StreamTransport<T>::handleSend, this,
179  boost::asio::placeholders::error,
180  boost::asio::placeholders::bytes_transferred));
181 }
182 
183 template<class T>
184 void
185 StreamTransport<T>::handleSend(const boost::system::error_code& error,
186  size_t nBytesSent)
187 {
188  if (error)
189  return processErrorCode(error);
190 
191  NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
192 
193  BOOST_ASSERT(!m_sendQueue.empty());
194  m_sendQueue.pop();
195 
196  if (!m_sendQueue.empty())
197  sendFromQueue();
198 }
199 
200 template<class T>
201 void
203 {
204  BOOST_ASSERT(getState() == TransportState::UP);
205 
206  m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
207  ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
209  boost::asio::placeholders::error,
210  boost::asio::placeholders::bytes_transferred));
211 }
212 
213 template<class T>
214 void
215 StreamTransport<T>::handleReceive(const boost::system::error_code& error,
216  size_t nBytesReceived)
217 {
218  if (error)
219  return processErrorCode(error);
220 
221  NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
222 
223  m_receiveBufferSize += nBytesReceived;
224 
225  size_t offset = 0;
226 
227  bool isOk = true;
228  Block element;
229  while (m_receiveBufferSize - offset > 0) {
230  std::tie(isOk, element) = Block::fromBuffer(m_receiveBuffer + offset, m_receiveBufferSize - offset);
231  if (!isOk)
232  break;
233 
234  offset += element.size();
235  BOOST_ASSERT(offset <= m_receiveBufferSize);
236 
237  this->receive(Transport::Packet(std::move(element)));
238  }
239 
240  if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
241  NFD_LOG_FACE_WARN("Failed to parse incoming packet or packet too large to process");
242  this->setState(TransportState::FAILED);
243  doClose();
244  return;
245  }
246 
247  if (offset > 0) {
248  if (offset != m_receiveBufferSize) {
249  std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
250  m_receiveBufferSize -= offset;
251  }
252  else {
253  m_receiveBufferSize = 0;
254  }
255  }
256 
257  startReceive();
258 }
259 
260 template<class T>
261 void
262 StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
263 {
264  NFD_LOG_FACE_TRACE(__func__);
265 
266  if (getState() == TransportState::CLOSING ||
267  getState() == TransportState::FAILED ||
268  getState() == TransportState::CLOSED ||
269  error == boost::asio::error::operation_aborted || // when cancel() is called
270  error == boost::asio::error::shut_down) // after shutdown() is called
271  // transport is shutting down, ignore any errors
272  return;
273 
274  handleError(error);
275 }
276 
277 template<class T>
278 void
279 StreamTransport<T>::handleError(const boost::system::error_code& error)
280 {
281  if (error != boost::asio::error::eof)
282  NFD_LOG_FACE_WARN("Send or receive operation failed: " << error.message());
283 
284  this->setState(TransportState::FAILED);
285  doClose();
286 }
287 
288 template<class T>
289 void
291 {
292  m_receiveBufferSize = 0;
293 }
294 
295 template<class T>
296 void
298 {
299  std::queue<Block> emptyQueue;
300  std::swap(emptyQueue, m_sendQueue);
301 }
302 
303 } // namespace face
304 } // namespace nfd
305 
306 #endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
virtual void doClose() override
performs Transport specific operations to close the transport
void processErrorCode(const boost::system::error_code &error)
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
Definition: face-log.hpp:74
virtual void handleError(const boost::system::error_code &error)
stores a packet along with the remote endpoint
Definition: transport.hpp:113
STL namespace.
detail::SimulatorIo & getGlobalIoService()
Definition: global-io.cpp:48
Class representing a wire element of NDN-TLV packet format.
Definition: block.hpp:43
Implements Transport for stream-based protocols.
size_t size() const
Definition: block.cpp:504
Copyright (c) 2011-2015 Regents of the University of California.
Definition: ndn-common.hpp:40
void post(const std::function< void()> &callback)
Definition: global-io.cpp:34
StreamTransport(typename protocol::socket &&socket)
Construct stream transport.
virtual void doSend(Transport::Packet &&packet) override
performs Transport specific operations to send a packet
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
the lower part of a Face
Definition: transport.hpp:104
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
Definition: face-log.hpp:83
const size_t MAX_NDN_PACKET_SIZE
practical limit of network layer packet size
Definition: tlv.hpp:39