NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.0: NDN, CCN, CCNx, content centric networks
API Documentation
stream-transport.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
22 #ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
23 #define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
24 
25 #include "transport.hpp"
26 
27 #include <list>
28 
29 namespace ndn {
30 
31 template<class BaseTransport, class Protocol>
33 {
34 public:
36 
37  typedef std::list<Block> BlockSequence;
38  typedef std::list<BlockSequence> TransmissionQueue;
39 
40  StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
41  : m_transport(transport)
42  , m_socket(ioService)
44  , m_connectionInProgress(false)
45  , m_connectTimer(ioService)
46  {
47  }
48 
49  void
50  connectHandler(const boost::system::error_code& error)
51  {
52  m_connectionInProgress = false;
53  m_connectTimer.cancel();
54 
55  if (!error)
56  {
57  resume();
58  m_transport.m_isConnected = true;
59 
60  if (!m_transmissionQueue.empty()) {
61  boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
62  bind(&Impl::handleAsyncWrite, this, _1,
63  m_transmissionQueue.begin()));
64  }
65  }
66  else
67  {
68  // may need to throw exception
69  m_transport.m_isConnected = false;
70  m_transport.close();
71  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
72  }
73  }
74 
75  void
76  connectTimeoutHandler(const boost::system::error_code& error)
77  {
78  if (error) // e.g., cancelled timer
79  return;
80 
81  m_transport.close();
82  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
83  }
84 
85  void
86  connect(const typename Protocol::endpoint& endpoint)
87  {
90 
91  // Wait at most 4 seconds to connect
93  m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
94  m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
95 
96  m_socket.open();
97  m_socket.async_connect(endpoint,
98  bind(&Impl::connectHandler, this, _1));
99  }
100  }
101 
102  void
104  {
105  m_connectionInProgress = false;
106 
107  boost::system::error_code error; // to silently ignore all errors
108  m_connectTimer.cancel(error);
109  m_socket.cancel(error);
110  m_socket.close(error);
111 
112  m_transport.m_isConnected = false;
113  m_transport.m_isExpectingData = false;
114  m_transmissionQueue.clear();
115  }
116 
117  void
119  {
121  return;
122 
123  if (m_transport.m_isExpectingData)
124  {
125  m_transport.m_isExpectingData = false;
126  m_socket.cancel();
127  }
128  }
129 
133  void
135  {
137  return;
138 
139  if (!m_transport.m_isExpectingData)
140  {
141  m_transport.m_isExpectingData = true;
142  m_inputBufferSize = 0;
143  m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
144  bind(&Impl::handleAsyncReceive, this, _1, _2));
145  }
146  }
147 
148  void
149  send(const Block& wire)
150  {
151  BlockSequence sequence;
152  sequence.push_back(wire);
153  m_transmissionQueue.push_back(sequence);
154 
155  if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
156  boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
157  bind(&Impl::handleAsyncWrite, this, _1,
158  m_transmissionQueue.begin()));
159  }
160 
161  // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
162  // next write will be scheduled either in connectHandler or in asyncWriteHandler
163  }
164 
165  void
166  send(const Block& header, const Block& payload)
167  {
168  BlockSequence sequence;
169  sequence.push_back(header);
170  sequence.push_back(payload);
171  m_transmissionQueue.push_back(sequence);
172 
173  if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
174  boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
175  bind(&Impl::handleAsyncWrite, this, _1,
176  m_transmissionQueue.begin()));
177  }
178 
179  // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
180  // next write will be scheduled either in connectHandler or in asyncWriteHandler
181  }
182 
183  void
184  handleAsyncWrite(const boost::system::error_code& error,
185  TransmissionQueue::iterator queueItem)
186  {
187  if (error)
188  {
189  if (error == boost::system::errc::operation_canceled) {
190  // async receive has been explicitly cancelled (e.g., socket close)
191  return;
192  }
193 
194  m_transport.close();
195  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while sending data to socket"));
196  }
197 
198  if (!m_transport.m_isConnected) {
199  return; // queue has been already cleared
200  }
201 
202  m_transmissionQueue.erase(queueItem);
203 
204  if (!m_transmissionQueue.empty()) {
205  boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
206  bind(&Impl::handleAsyncWrite, this, _1,
207  m_transmissionQueue.begin()));
208  }
209  }
210 
211  bool
212  processAll(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
213  {
214  while (offset < nBytesAvailable) {
215  bool isOk = false;
216  Block element;
217  std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
218  if (!isOk)
219  return false;
220 
221  m_transport.receive(element);
222  offset += element.size();
223  }
224  return true;
225  }
226 
227  void
228  handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
229  {
230  if (error)
231  {
232  if (error == boost::system::errc::operation_canceled) {
233  // async receive has been explicitly cancelled (e.g., socket close)
234  return;
235  }
236 
237  m_transport.close();
238  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while receiving data from socket"));
239  }
240 
241  m_inputBufferSize += nBytesRecvd;
242  // do magic
243 
244  std::size_t offset = 0;
245  bool hasProcessedSome = processAll(m_inputBuffer, offset, m_inputBufferSize);
246  if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
247  {
248  m_transport.close();
249  BOOST_THROW_EXCEPTION(Transport::Error(boost::system::error_code(),
250  "input buffer full, but a valid TLV cannot be "
251  "decoded"));
252  }
253 
254  if (offset > 0)
255  {
256  if (offset != m_inputBufferSize)
257  {
258  std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
259  m_inputBuffer);
260  m_inputBufferSize -= offset;
261  }
262  else
263  {
264  m_inputBufferSize = 0;
265  }
266  }
267 
268  m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
270  bind(&Impl::handleAsyncReceive, this, _1, _2));
271  }
272 
273 protected:
274  BaseTransport& m_transport;
275 
276  typename Protocol::socket m_socket;
279 
280  TransmissionQueue m_transmissionQueue;
282 
283  boost::asio::deadline_timer m_connectTimer;
284 };
285 
286 
287 template<class BaseTransport, class Protocol>
288 class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
289 {
290 public:
292 
293  StreamTransportWithResolverImpl(BaseTransport& transport, boost::asio::io_service& ioService)
294  : StreamTransportImpl<BaseTransport, Protocol>(transport, ioService)
295  {
296  }
297 
298  void
299  resolveHandler(const boost::system::error_code& error,
300  typename Protocol::resolver::iterator endpoint,
301  const shared_ptr<typename Protocol::resolver>&)
302  {
303  if (error)
304  {
305  if (error == boost::system::errc::operation_canceled)
306  return;
307 
308  BOOST_THROW_EXCEPTION(Transport::Error(error, "Error during resolution of host or port"));
309  }
310 
311  typename Protocol::resolver::iterator end;
312  if (endpoint == end)
313  {
314  this->m_transport.close();
315  BOOST_THROW_EXCEPTION(Transport::Error(error, "Unable to resolve because host or port"));
316  }
317 
318  this->m_socket.async_connect(*endpoint,
319  bind(&Impl::connectHandler, this, _1));
320  }
321 
322  void
323  connect(const typename Protocol::resolver::query& query)
324  {
325  if (!this->m_connectionInProgress) {
326  this->m_connectionInProgress = true;
327 
328  // Wait at most 4 seconds to connect
330  this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
331  this->m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
332 
333  // typename boost::asio::ip::basic_resolver< Protocol > resolver;
334  shared_ptr<typename Protocol::resolver> resolver =
335  make_shared<typename Protocol::resolver>(ref(this->m_socket.get_io_service()));
336 
337  resolver->async_resolve(query, bind(&Impl::resolveHandler, this, _1, _2, resolver));
338  }
339  }
340 };
341 
342 
343 } // namespace ndn
344 
345 #endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP
void connectTimeoutHandler(const boost::system::error_code &error)
Copyright (c) 2011-2015 Regents of the University of California.
static std::tuple< bool, Block > fromBuffer(ConstBufferPtr buffer, size_t offset)
Try to construct block from Buffer.
Definition: block.cpp:253
StreamTransportImpl(BaseTransport &transport, boost::asio::io_service &ioService)
Class representing a wire element of NDN-TLV packet format.
Definition: block.hpp:43
StreamTransportImpl< BaseTransport, Protocol > Impl
TransmissionQueue m_transmissionQueue
void handleAsyncReceive(const boost::system::error_code &error, std::size_t nBytesRecvd)
void send(const Block &wire)
Table::const_iterator iterator
Definition: cs-internal.hpp:41
size_t size() const
Definition: block.cpp:504
void connect(const typename Protocol::resolver::query &query)
void connect(const typename Protocol::endpoint &endpoint)
void handleAsyncWrite(const boost::system::error_code &error, TransmissionQueue::iterator queueItem)
void resolveHandler(const boost::system::error_code &error, typename Protocol::resolver::iterator endpoint, const shared_ptr< typename Protocol::resolver > &)
uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE]
StreamTransportWithResolverImpl< BaseTransport, Protocol > Impl
void send(const Block &header, const Block &payload)
boost::asio::deadline_timer m_connectTimer
bool processAll(uint8_t *buffer, size_t &offset, size_t nBytesAvailable)
std::list< Block > BlockSequence
std::list< BlockSequence > TransmissionQueue
void connectHandler(const boost::system::error_code &error)
const size_t MAX_NDN_PACKET_SIZE
practical limit of network layer packet size
Definition: tlv.hpp:39
StreamTransportWithResolverImpl(BaseTransport &transport, boost::asio::io_service &ioService)