NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.0: NDN, CCN, CCNx, content centric networks
API Documentation
stream-transport.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
26 #ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27 #define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28 
29 #include "transport.hpp"
30 #include "core/global-io.hpp"
31 
32 #include <queue>
33 
34 namespace nfd {
35 namespace face {
36 
41 template<class Protocol>
42 class StreamTransport : public Transport
43 {
44 public:
45  typedef Protocol protocol;
46 
51  explicit
52  StreamTransport(typename protocol::socket&& socket);
53 
54 protected:
55  virtual void
57 
58  void
59  deferredClose();
60 
61  virtual void
63 
64  void
65  sendFromQueue();
66 
67  void
68  handleSend(const boost::system::error_code& error,
69  size_t nBytesSent);
70 
71  void
72  handleReceive(const boost::system::error_code& error,
73  size_t nBytesReceived);
74 
75  void
76  processErrorCode(const boost::system::error_code& error);
77 
78 protected:
79  typename protocol::socket m_socket;
80 
82 
83 private:
84  uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
85  size_t m_receiveBufferSize;
86  std::queue<Block> m_sendQueue;
87 };
88 
89 
90 template<class T>
91 StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
92  : m_socket(std::move(socket))
93  , m_receiveBufferSize(0)
94 {
95  m_socket.async_receive(boost::asio::buffer(m_receiveBuffer, ndn::MAX_NDN_PACKET_SIZE),
97  boost::asio::placeholders::error,
98  boost::asio::placeholders::bytes_transferred));
99 }
100 
101 template<class T>
102 void
104 {
105  NFD_LOG_FACE_TRACE(__func__);
106 
107  if (m_socket.is_open()) {
108  // Cancel all outstanding operations and shutdown the socket
109  // so that no further sends or receives are possible.
110  // Use the non-throwing variants and ignore errors, if any.
111  boost::system::error_code error;
112  m_socket.cancel(error);
113  m_socket.shutdown(protocol::socket::shutdown_both, error);
114  }
115 
116  // Ensure that the Transport stays alive at least until
117  // all pending handlers are dispatched
119 
120  // Some bug or feature of Boost.Asio (see http://redmine.named-data.net/issues/1856):
121  //
122  // When doClose is called from a socket event handler (e.g., from handleReceive),
123  // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
124  // Instead, handleSend is invoked as nothing bad happened.
125  //
126  // In order to prevent the assertion in handleSend from failing, we clear the queue
127  // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
128  // this point have been executed. If more send operations are scheduled after this
129  // point, they will fail because the socket has been shutdown, and their callbacks
130  // will be invoked with error code == asio::error::shut_down.
131 }
132 
133 template<class T>
134 void
136 {
137  NFD_LOG_FACE_TRACE(__func__);
138 
139  // clear send queue
140  std::queue<Block> emptyQueue;
141  std::swap(emptyQueue, m_sendQueue);
142 
143  // use the non-throwing variant and ignore errors, if any
144  boost::system::error_code error;
145  m_socket.close(error);
146 
148 }
149 
150 template<class T>
151 void
153 {
154  NFD_LOG_FACE_TRACE(__func__);
155 
156  bool wasQueueEmpty = m_sendQueue.empty();
157  m_sendQueue.push(packet.packet);
158 
159  if (wasQueueEmpty)
160  sendFromQueue();
161 }
162 
163 template<class T>
164 void
166 {
167  boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
168  bind(&StreamTransport<T>::handleSend, this,
169  boost::asio::placeholders::error,
170  boost::asio::placeholders::bytes_transferred));
171 }
172 
173 template<class T>
174 void
175 StreamTransport<T>::handleSend(const boost::system::error_code& error,
176  size_t nBytesSent)
177 {
178  if (error)
179  return processErrorCode(error);
180 
181  NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
182 
183  BOOST_ASSERT(!m_sendQueue.empty());
184  m_sendQueue.pop();
185 
186  if (!m_sendQueue.empty())
187  sendFromQueue();
188 }
189 
190 template<class T>
191 void
192 StreamTransport<T>::handleReceive(const boost::system::error_code& error,
193  size_t nBytesReceived)
194 {
195  if (error)
196  return processErrorCode(error);
197 
198  NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
199 
200  m_receiveBufferSize += nBytesReceived;
201 
202  size_t offset = 0;
203 
204  bool isOk = true;
205  Block element;
206  while (m_receiveBufferSize - offset > 0) {
207  std::tie(isOk, element) = Block::fromBuffer(m_receiveBuffer + offset, m_receiveBufferSize - offset);
208  if (!isOk)
209  break;
210 
211  offset += element.size();
212  BOOST_ASSERT(offset <= m_receiveBufferSize);
213 
214  this->receive(Transport::Packet(std::move(element)));
215  }
216 
217  if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
218  NFD_LOG_FACE_WARN("Failed to parse incoming packet or packet too large to process");
220  doClose();
221  return;
222  }
223 
224  if (offset > 0) {
225  if (offset != m_receiveBufferSize) {
226  std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
227  m_receiveBufferSize -= offset;
228  }
229  else {
230  m_receiveBufferSize = 0;
231  }
232  }
233 
234  m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
235  ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
237  boost::asio::placeholders::error,
238  boost::asio::placeholders::bytes_transferred));
239 }
240 
241 template<class T>
242 void
243 StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
244 {
245  NFD_LOG_FACE_TRACE(__func__);
246 
250  error == boost::asio::error::operation_aborted || // when cancel() is called
251  error == boost::asio::error::shut_down) // after shutdown() is called
252  // transport is shutting down, ignore any errors
253  return;
254 
255  if (error != boost::asio::error::eof)
256  NFD_LOG_FACE_WARN("Send or receive operation failed: " << error.message());
257 
259  doClose();
260 }
261 
262 } // namespace face
263 } // namespace nfd
264 
265 #endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
Copyright (c) 2011-2015 Regents of the University of California.
void processErrorCode(const boost::system::error_code &error)
static std::tuple< bool, Block > fromBuffer(ConstBufferPtr buffer, size_t offset)
Try to construct block from Buffer.
Definition: block.cpp:253
Copyright (c) 2013-2015 Regents of the University of California.
virtual void doSend(Transport::Packet &&packet) 1
performs Transport specific operations to send a packet
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
Definition: face-log.hpp:74
stores a packet along with the remote endpoint
Definition: transport.hpp:113
STL namespace.
detail::SimulatorIo & getGlobalIoService()
Definition: global-io.cpp:48
Class representing a wire element of NDN-TLV packet format.
Definition: block.hpp:43
#define DECL_OVERRIDE
expands to &#39;override&#39; if compiler supports &#39;override&#39; specifier, otherwise expands to nothing ...
Definition: common.hpp:50
Implements Transport for stream-based protocols.
the transport is being closed due to a failure
size_t size() const
Definition: block.cpp:504
TransportState getState() const
Definition: transport.hpp:423
Copyright (c) 2011-2015 Regents of the University of California.
Definition: ndn-common.hpp:40
the transport is closed, and can be safely deallocated
void post(const std::function< void()> &callback)
Definition: global-io.cpp:34
StreamTransport(typename protocol::socket &&socket)
Construct stream transport.
virtual void doClose() 1
performs Transport specific operations to close the transport
the transport is requested to be closed
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
void setState(TransportState newState)
set transport state
Definition: transport.cpp:150
void receive(Packet &&packet)
receive a link-layer packet
Definition: transport.cpp:119
the lower part of a Face
Definition: transport.hpp:104
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
Definition: face-log.hpp:83
const size_t MAX_NDN_PACKET_SIZE
practical limit of network layer packet size
Definition: tlv.hpp:39