NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.5: NDN, CCN, CCNx, content centric networks
API Documentation
stream-transport.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2022, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27 #define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28 
29 #include "transport.hpp"
30 #include "socket-utils.hpp"
31 #include "common/global.hpp"
32 
33 #include <queue>
34 
35 namespace nfd {
36 namespace face {
37 
42 template<class Protocol>
43 class StreamTransport : public Transport
44 {
45 public:
46  using protocol = Protocol;
47 
52  explicit
54 
55  ssize_t
56  getSendQueueLength() override;
57 
58 protected:
59  void
60  doClose() override;
61 
62  void
63  deferredClose();
64 
65  void
66  doSend(const Block& packet) override;
67 
68  void
69  sendFromQueue();
70 
71  void
72  handleSend(const boost::system::error_code& error,
73  size_t nBytesSent);
74 
75  void
76  startReceive();
77 
78  void
79  handleReceive(const boost::system::error_code& error,
80  size_t nBytesReceived);
81 
82  void
83  processErrorCode(const boost::system::error_code& error);
84 
85  virtual void
86  handleError(const boost::system::error_code& error);
87 
88  void
90 
91  void
93 
94  size_t
95  getSendQueueBytes() const;
96 
97 protected:
99 
101 
102 private:
103  uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
104  size_t m_receiveBufferSize;
105  std::queue<Block> m_sendQueue;
106  size_t m_sendQueueBytes;
107 };
108 
109 
110 template<class T>
112  : m_socket(std::move(socket))
113  , m_receiveBufferSize(0)
114  , m_sendQueueBytes(0)
115 {
116  // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
117  // Therefore, protecting against send queue overflows is less critical than in other transport
118  // types. Instead, we use the default threshold specified in the GenericLinkService options.
119 
120  startReceive();
121 }
122 
123 template<class T>
124 ssize_t
126 {
127  ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
128  if (queueLength == QUEUE_ERROR) {
129  NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
130  }
131  return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
132 }
133 
134 template<class T>
135 void
137 {
138  NFD_LOG_FACE_TRACE(__func__);
139 
140  if (m_socket.is_open()) {
141  // Cancel all outstanding operations and shutdown the socket
142  // so that no further sends or receives are possible.
143  // Use the non-throwing variants and ignore errors, if any.
144  boost::system::error_code error;
145  m_socket.cancel(error);
146  m_socket.shutdown(protocol::socket::shutdown_both, error);
147  }
148 
149  // Ensure that the Transport stays alive at least until
150  // all pending handlers are dispatched
151  getGlobalIoService().post([this] { deferredClose(); });
152 
153  // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
154  //
155  // When doClose is called from a socket event handler (e.g., from handleReceive),
156  // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
157  // Instead, handleSend is invoked as nothing bad happened.
158  //
159  // In order to prevent the assertion in handleSend from failing, we clear the queue
160  // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
161  // this point have been executed. If more send operations are scheduled after this
162  // point, they will fail because the socket has been shutdown, and their callbacks
163  // will be invoked with error code == asio::error::shut_down.
164 }
165 
166 template<class T>
167 void
169 {
170  NFD_LOG_FACE_TRACE(__func__);
171 
172  resetSendQueue();
173 
174  // use the non-throwing variant and ignore errors, if any
175  boost::system::error_code error;
176  m_socket.close(error);
177 
178  this->setState(TransportState::CLOSED);
179 }
180 
181 template<class T>
182 void
184 {
185  NFD_LOG_FACE_TRACE(__func__);
186 
187  if (getState() != TransportState::UP)
188  return;
189 
190  bool wasQueueEmpty = m_sendQueue.empty();
191  m_sendQueue.push(packet);
192  m_sendQueueBytes += packet.size();
193 
194  if (wasQueueEmpty)
195  sendFromQueue();
196 }
197 
198 template<class T>
199 void
201 {
202  boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
203  [this] (auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
204 }
205 
206 template<class T>
207 void
208 StreamTransport<T>::handleSend(const boost::system::error_code& error,
209  size_t nBytesSent)
210 {
211  if (error)
212  return processErrorCode(error);
213 
214  NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
215 
216  BOOST_ASSERT(!m_sendQueue.empty());
217  BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
218  m_sendQueueBytes -= nBytesSent;
219  m_sendQueue.pop();
220 
221  if (!m_sendQueue.empty())
222  sendFromQueue();
223 }
224 
225 template<class T>
226 void
228 {
229  BOOST_ASSERT(getState() == TransportState::UP);
230 
231  m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
232  ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
233  [this] (auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
234 }
235 
236 template<class T>
237 void
238 StreamTransport<T>::handleReceive(const boost::system::error_code& error,
239  size_t nBytesReceived)
240 {
241  if (error)
242  return processErrorCode(error);
243 
244  NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
245 
246  m_receiveBufferSize += nBytesReceived;
247  auto bufferView = ndn::make_span(m_receiveBuffer, m_receiveBufferSize);
248  size_t offset = 0;
249  bool isOk = true;
250  while (offset < bufferView.size()) {
251  Block element;
252  std::tie(isOk, element) = Block::fromBuffer(bufferView.subspan(offset));
253  if (!isOk)
254  break;
255 
256  offset += element.size();
257  BOOST_ASSERT(offset <= bufferView.size());
258 
259  this->receive(element);
260  }
261 
262  if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
263  NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
264  this->setState(TransportState::FAILED);
265  doClose();
266  return;
267  }
268 
269  if (offset > 0) {
270  if (offset != m_receiveBufferSize) {
271  std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
272  m_receiveBufferSize -= offset;
273  }
274  else {
275  m_receiveBufferSize = 0;
276  }
277  }
278 
279  startReceive();
280 }
281 
282 template<class T>
283 void
284 StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
285 {
286  NFD_LOG_FACE_TRACE(__func__);
287 
288  if (getState() == TransportState::CLOSING ||
289  getState() == TransportState::FAILED ||
290  getState() == TransportState::CLOSED ||
291  error == boost::asio::error::operation_aborted || // when cancel() is called
292  error == boost::asio::error::shut_down) // after shutdown() is called
293  // transport is shutting down, ignore any errors
294  return;
295 
296  handleError(error);
297 }
298 
299 template<class T>
300 void
301 StreamTransport<T>::handleError(const boost::system::error_code& error)
302 {
303  if (error == boost::asio::error::eof) {
304  this->setState(TransportState::CLOSING);
305  }
306  else {
307  NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
308  this->setState(TransportState::FAILED);
309  }
310  doClose();
311 }
312 
313 template<class T>
314 void
316 {
317  m_receiveBufferSize = 0;
318 }
319 
320 template<class T>
321 void
323 {
324  std::queue<Block> emptyQueue;
325  std::swap(emptyQueue, m_sendQueue);
326  m_sendQueueBytes = 0;
327 }
328 
329 template<class T>
330 size_t
332 {
333  return m_sendQueueBytes;
334 }
335 
336 } // namespace face
337 } // namespace nfd
338 
339 #endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
void swap(any &x, any &y) any_noexcept
Definition: any-lite.hpp:614
void doClose() override
performs Transport specific operations to close the transport
void doSend(const Block &packet) override
performs Transport specific operations to send a packet
void processErrorCode(const boost::system::error_code &error)
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
virtual void handleError(const boost::system::error_code &error)
#define NFD_LOG_FACE_ERROR(msg)
Log a message at ERROR level.
STL namespace.
const ssize_t QUEUE_ERROR
indicates that the transport was unable to retrieve the queue capacity/length
Definition: transport.hpp:103
detail::SimulatorIo & getGlobalIoService()
Returns the global io_service instance for the calling thread.
Definition: global.cpp:49
Represents a TLV element of the NDN packet format.
Definition: block.hpp:44
Implements Transport for stream-based protocols.
size_t size() const
Return the size of the encoded wire, i.e., of the whole TLV.
Definition: block.cpp:294
Copyright (c) 2011-2015 Regents of the University of California.
Definition: ndn-common.hpp:39
void post(const std::function< void()> &callback)
Definition: global.cpp:35
StreamTransport(typename protocol::socket &&socket)
Construct stream transport.
ssize_t getTxQueueLength(int fd)
obtain send queue length from a specified system socket
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
CFReleaser< CFStringRef > fromBuffer(const uint8_t *buf, size_t buflen)
Create a CFString by copying bytes from a raw buffer.
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
Catch-all error for socket component errors that don&#39;t fit in other categories.
Definition: base.hpp:83
ssize_t getSendQueueLength() override
The lower half of a Face.
Definition: transport.hpp:108
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
const size_t MAX_NDN_PACKET_SIZE
Practical size limit of a network-layer packet.
Definition: tlv.hpp:41