NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.0: NDN, CCN, CCNx, content centric networks
API Documentation
stream-transport.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
22 #ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
23 #define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
24 
25 #include "transport.hpp"
26 
27 #include <boost/asio.hpp>
28 #include <list>
29 
30 namespace ndn {
31 
32 template<class BaseTransport, class Protocol>
34 {
35 public:
37 
38  typedef std::list<Block> BlockSequence;
39  typedef std::list<BlockSequence> TransmissionQueue;
40 
41  StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
42  : m_transport(transport)
43  , m_socket(ioService)
45  , m_connectionInProgress(false)
46  , m_connectTimer(ioService)
47  {
48  }
49 
50  void
51  connectHandler(const boost::system::error_code& error)
52  {
53  m_connectionInProgress = false;
54  m_connectTimer.cancel();
55 
56  if (!error)
57  {
58  resume();
59  m_transport.m_isConnected = true;
60 
61  if (!m_transmissionQueue.empty()) {
62  boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
63  bind(&Impl::handleAsyncWrite, this, _1,
64  m_transmissionQueue.begin()));
65  }
66  }
67  else
68  {
69  // may need to throw exception
70  m_transport.m_isConnected = false;
71  m_transport.close();
72  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
73  }
74  }
75 
76  void
77  connectTimeoutHandler(const boost::system::error_code& error)
78  {
79  if (error) // e.g., cancelled timer
80  return;
81 
82  m_transport.close();
83  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
84  }
85 
86  void
87  connect(const typename Protocol::endpoint& endpoint)
88  {
91 
92  // Wait at most 4 seconds to connect
94  m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
95  m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
96 
97  m_socket.open();
98  m_socket.async_connect(endpoint,
99  bind(&Impl::connectHandler, this, _1));
100  }
101  }
102 
103  void
105  {
106  m_connectionInProgress = false;
107 
108  boost::system::error_code error; // to silently ignore all errors
109  m_connectTimer.cancel(error);
110  m_socket.cancel(error);
111  m_socket.close(error);
112 
113  m_transport.m_isConnected = false;
114  m_transport.m_isExpectingData = false;
115  m_transmissionQueue.clear();
116  }
117 
118  void
120  {
122  return;
123 
124  if (m_transport.m_isExpectingData)
125  {
126  m_transport.m_isExpectingData = false;
127  m_socket.cancel();
128  }
129  }
130 
134  void
136  {
138  return;
139 
140  if (!m_transport.m_isExpectingData)
141  {
142  m_transport.m_isExpectingData = true;
143  m_inputBufferSize = 0;
144  m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
145  bind(&Impl::handleAsyncReceive, this, _1, _2));
146  }
147  }
148 
149  void
150  send(const Block& wire)
151  {
152  BlockSequence sequence;
153  sequence.push_back(wire);
154  m_transmissionQueue.push_back(sequence);
155 
156  if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
157  boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
158  bind(&Impl::handleAsyncWrite, this, _1,
159  m_transmissionQueue.begin()));
160  }
161 
162  // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
163  // next write will be scheduled either in connectHandler or in asyncWriteHandler
164  }
165 
166  void
167  send(const Block& header, const Block& payload)
168  {
169  BlockSequence sequence;
170  sequence.push_back(header);
171  sequence.push_back(payload);
172  m_transmissionQueue.push_back(sequence);
173 
174  if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
175  boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
176  bind(&Impl::handleAsyncWrite, this, _1,
177  m_transmissionQueue.begin()));
178  }
179 
180  // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
181  // next write will be scheduled either in connectHandler or in asyncWriteHandler
182  }
183 
184  void
185  handleAsyncWrite(const boost::system::error_code& error,
186  TransmissionQueue::iterator queueItem)
187  {
188  if (error)
189  {
190  if (error == boost::system::errc::operation_canceled) {
191  // async receive has been explicitly cancelled (e.g., socket close)
192  return;
193  }
194 
195  m_transport.close();
196  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while sending data to socket"));
197  }
198 
199  if (!m_transport.m_isConnected) {
200  return; // queue has been already cleared
201  }
202 
203  m_transmissionQueue.erase(queueItem);
204 
205  if (!m_transmissionQueue.empty()) {
206  boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
207  bind(&Impl::handleAsyncWrite, this, _1,
208  m_transmissionQueue.begin()));
209  }
210  }
211 
212  bool
213  processAll(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
214  {
215  while (offset < nBytesAvailable) {
216  bool isOk = false;
217  Block element;
218  std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
219  if (!isOk)
220  return false;
221 
222  m_transport.receive(element);
223  offset += element.size();
224  }
225  return true;
226  }
227 
228  void
229  handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
230  {
231  if (error)
232  {
233  if (error == boost::system::errc::operation_canceled) {
234  // async receive has been explicitly cancelled (e.g., socket close)
235  return;
236  }
237 
238  m_transport.close();
239  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while receiving data from socket"));
240  }
241 
242  m_inputBufferSize += nBytesRecvd;
243  // do magic
244 
245  std::size_t offset = 0;
246  bool hasProcessedSome = processAll(m_inputBuffer, offset, m_inputBufferSize);
247  if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
248  {
249  m_transport.close();
250  BOOST_THROW_EXCEPTION(Transport::Error(boost::system::error_code(),
251  "input buffer full, but a valid TLV cannot be "
252  "decoded"));
253  }
254 
255  if (offset > 0)
256  {
257  if (offset != m_inputBufferSize)
258  {
259  std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
260  m_inputBuffer);
261  m_inputBufferSize -= offset;
262  }
263  else
264  {
265  m_inputBufferSize = 0;
266  }
267  }
268 
269  m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
271  bind(&Impl::handleAsyncReceive, this, _1, _2));
272  }
273 
274 protected:
275  BaseTransport& m_transport;
276 
277  typename Protocol::socket m_socket;
280 
281  TransmissionQueue m_transmissionQueue;
283 
284  boost::asio::deadline_timer m_connectTimer;
285 };
286 
287 
288 template<class BaseTransport, class Protocol>
289 class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
290 {
291 public:
293 
294  StreamTransportWithResolverImpl(BaseTransport& transport, boost::asio::io_service& ioService)
295  : StreamTransportImpl<BaseTransport, Protocol>(transport, ioService)
296  {
297  }
298 
299  void
300  resolveHandler(const boost::system::error_code& error,
301  typename Protocol::resolver::iterator endpoint,
302  const shared_ptr<typename Protocol::resolver>&)
303  {
304  if (error)
305  {
306  if (error == boost::system::errc::operation_canceled)
307  return;
308 
309  BOOST_THROW_EXCEPTION(Transport::Error(error, "Error during resolution of host or port"));
310  }
311 
312  typename Protocol::resolver::iterator end;
313  if (endpoint == end)
314  {
315  this->m_transport.close();
316  BOOST_THROW_EXCEPTION(Transport::Error(error, "Unable to resolve because host or port"));
317  }
318 
319  this->m_socket.async_connect(*endpoint,
320  bind(&Impl::connectHandler, this, _1));
321  }
322 
323  void
324  connect(const typename Protocol::resolver::query& query)
325  {
326  if (!this->m_connectionInProgress) {
327  this->m_connectionInProgress = true;
328 
329  // Wait at most 4 seconds to connect
331  this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
332  this->m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
333 
334  // typename boost::asio::ip::basic_resolver< Protocol > resolver;
335  shared_ptr<typename Protocol::resolver> resolver =
336  make_shared<typename Protocol::resolver>(ref(this->m_socket.get_io_service()));
337 
338  resolver->async_resolve(query, bind(&Impl::resolveHandler, this, _1, _2, resolver));
339  }
340  }
341 };
342 
343 
344 } // namespace ndn
345 
346 #endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP
void connectTimeoutHandler(const boost::system::error_code &error)
Copyright (c) 2011-2015 Regents of the University of California.
static std::tuple< bool, Block > fromBuffer(ConstBufferPtr buffer, size_t offset)
Try to construct block from Buffer.
Definition: block.cpp:253
StreamTransportImpl(BaseTransport &transport, boost::asio::io_service &ioService)
Class representing a wire element of NDN-TLV packet format.
Definition: block.hpp:43
StreamTransportImpl< BaseTransport, Protocol > Impl
TransmissionQueue m_transmissionQueue
void handleAsyncReceive(const boost::system::error_code &error, std::size_t nBytesRecvd)
void send(const Block &wire)
Table::const_iterator iterator
Definition: cs-internal.hpp:41
size_t size() const
Definition: block.cpp:504
void connect(const typename Protocol::resolver::query &query)
void connect(const typename Protocol::endpoint &endpoint)
void handleAsyncWrite(const boost::system::error_code &error, TransmissionQueue::iterator queueItem)
void resolveHandler(const boost::system::error_code &error, typename Protocol::resolver::iterator endpoint, const shared_ptr< typename Protocol::resolver > &)
uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE]
StreamTransportWithResolverImpl< BaseTransport, Protocol > Impl
void send(const Block &header, const Block &payload)
boost::asio::deadline_timer m_connectTimer
bool processAll(uint8_t *buffer, size_t &offset, size_t nBytesAvailable)
std::list< Block > BlockSequence
std::list< BlockSequence > TransmissionQueue
void connectHandler(const boost::system::error_code &error)
const size_t MAX_NDN_PACKET_SIZE
practical limit of network layer packet size
Definition: tlv.hpp:39
StreamTransportWithResolverImpl(BaseTransport &transport, boost::asio::io_service &ioService)