26 #ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP 27 #define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP 42 template<
class Protocol>
72 handleSend(
const boost::system::error_code& error,
80 size_t nBytesReceived);
86 handleError(
const boost::system::error_code& error);
104 size_t m_receiveBufferSize;
105 std::queue<Block> m_sendQueue;
106 size_t m_sendQueueBytes;
112 : m_socket(
std::move(socket))
113 , m_receiveBufferSize(0)
114 , m_sendQueueBytes(0)
129 NFD_LOG_FACE_WARN(
"Failed to obtain send queue length from socket: " << std::strerror(errno));
131 return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
140 if (m_socket.is_open()) {
144 boost::system::error_code error;
145 m_socket.cancel(error);
146 m_socket.shutdown(protocol::socket::shutdown_both, error);
175 boost::system::error_code error;
176 m_socket.close(error);
178 this->setState(TransportState::CLOSED);
187 if (getState() != TransportState::UP)
190 bool wasQueueEmpty = m_sendQueue.empty();
191 m_sendQueue.push(packet.packet);
192 m_sendQueueBytes += packet.packet.size();
202 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
204 boost::asio::placeholders::error,
205 boost::asio::placeholders::bytes_transferred));
214 return processErrorCode(error);
218 BOOST_ASSERT(!m_sendQueue.empty());
219 BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
220 m_sendQueueBytes -= nBytesSent;
223 if (!m_sendQueue.empty())
231 BOOST_ASSERT(getState() == TransportState::UP);
233 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
236 boost::asio::placeholders::error,
237 boost::asio::placeholders::bytes_transferred));
243 size_t nBytesReceived)
246 return processErrorCode(error);
250 m_receiveBufferSize += nBytesReceived;
253 while (m_receiveBufferSize - offset > 0) {
255 std::tie(isOk, element) = Block::fromBuffer(m_receiveBuffer + offset, m_receiveBufferSize - offset);
259 offset += element.
size();
260 BOOST_ASSERT(offset <= m_receiveBufferSize);
267 this->setState(TransportState::FAILED);
273 if (offset != m_receiveBufferSize) {
274 std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
275 m_receiveBufferSize -= offset;
278 m_receiveBufferSize = 0;
291 if (getState() == TransportState::CLOSING ||
292 getState() == TransportState::FAILED ||
293 getState() == TransportState::CLOSED ||
294 error == boost::asio::error::operation_aborted ||
295 error == boost::asio::error::shut_down)
306 if (error == boost::asio::error::eof) {
307 this->setState(TransportState::CLOSING);
311 this->setState(TransportState::FAILED);
320 m_receiveBufferSize = 0;
327 std::queue<Block> emptyQueue;
328 std::swap(emptyQueue, m_sendQueue);
329 m_sendQueueBytes = 0;
336 return m_sendQueueBytes;
342 #endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP void doClose() override
performs Transport specific operations to close the transport
void processErrorCode(const boost::system::error_code &error)
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
virtual void handleError(const boost::system::error_code &error)
#define NFD_LOG_FACE_ERROR(msg)
Log a message at ERROR level.
stores a packet along with the remote endpoint
const ssize_t QUEUE_ERROR
indicates that the transport was unable to retrieve the queue capacity/length
detail::SimulatorIo & getGlobalIoService()
Represents a TLV element of NDN packet format.
size_t getSendQueueBytes() const
Implements Transport for stream-based protocols.
protocol::socket m_socket
NFD_LOG_INCLASS_DECLARE()
size_t size() const
Get size of encoded wire, including Type-Length-Value.
Copyright (c) 2011-2015 Regents of the University of California.
void post(const std::function< void()> &callback)
StreamTransport(typename protocol::socket &&socket)
Construct stream transport.
void doSend(Transport::Packet &&packet) override
performs Transport specific operations to send a packet
ssize_t getTxQueueLength(int fd)
obtain send queue length from a specified system socket
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
void resetReceiveBuffer()
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
ssize_t getSendQueueLength() override
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
const size_t MAX_NDN_PACKET_SIZE
practical limit of network layer packet size