NS-3 based Named Data Networking (NDN) simulator
ndnSIM 2.5: NDN, CCN, CCNx, content centric networks
API Documentation
broadcast_server.cpp
Go to the documentation of this file.
2 
3 #include <websocketpp/server.hpp>
4 
5 #include <iostream>
6 #include <set>
7 
8 /*#include <boost/thread.hpp>
9 #include <boost/thread/mutex.hpp>
10 #include <boost/thread/condition_variable.hpp>*/
12 
14 
16 using websocketpp::lib::placeholders::_1;
17 using websocketpp::lib::placeholders::_2;
18 using websocketpp::lib::bind;
19 
20 using websocketpp::lib::thread;
21 using websocketpp::lib::mutex;
22 using websocketpp::lib::lock_guard;
23 using websocketpp::lib::unique_lock;
24 using websocketpp::lib::condition_variable;
25 
26 /* on_open insert connection_hdl into channel
27  * on_close remove connection_hdl from channel
28  * on_message queue send to all channels
29  */
30 
35 };
36 
37 struct action {
40  : type(t), hdl(h), msg(m) {}
41 
45 };
46 
47 class broadcast_server {
48 public:
50  // Initialize Asio Transport
51  m_server.init_asio();
52 
53  // Register handler callbacks
54  m_server.set_open_handler(bind(&broadcast_server::on_open,this,::_1));
55  m_server.set_close_handler(bind(&broadcast_server::on_close,this,::_1));
56  m_server.set_message_handler(bind(&broadcast_server::on_message,this,::_1,::_2));
57  }
58 
59  void run(uint16_t port) {
60  // listen on specified port
61  m_server.listen(port);
62 
63  // Start the server accept loop
64  m_server.start_accept();
65 
66  // Start the ASIO io_service run loop
67  try {
68  m_server.run();
69  } catch (const std::exception & e) {
70  std::cout << e.what() << std::endl;
71  }
72  }
73 
75  {
76  lock_guard<mutex> guard(m_action_lock);
77  //std::cout << "on_open" << std::endl;
78  m_actions.push(action(SUBSCRIBE,hdl));
79  }
80  m_action_cond.notify_one();
81  }
82 
84  {
85  lock_guard<mutex> guard(m_action_lock);
86  //std::cout << "on_close" << std::endl;
87  m_actions.push(action(UNSUBSCRIBE,hdl));
88  }
89  m_action_cond.notify_one();
90  }
91 
93  // queue message up for sending by processing thread
94  {
95  lock_guard<mutex> guard(m_action_lock);
96  //std::cout << "on_message" << std::endl;
97  m_actions.push(action(MESSAGE,hdl,msg));
98  }
99  m_action_cond.notify_one();
100  }
101 
103  while(1) {
104  unique_lock<mutex> lock(m_action_lock);
105 
106  while(m_actions.empty()) {
107  m_action_cond.wait(lock);
108  }
109 
110  action a = m_actions.front();
111  m_actions.pop();
112 
113  lock.unlock();
114 
115  if (a.type == SUBSCRIBE) {
116  lock_guard<mutex> guard(m_connection_lock);
117  m_connections.insert(a.hdl);
118  } else if (a.type == UNSUBSCRIBE) {
119  lock_guard<mutex> guard(m_connection_lock);
120  m_connections.erase(a.hdl);
121  } else if (a.type == MESSAGE) {
122  lock_guard<mutex> guard(m_connection_lock);
123 
124  con_list::iterator it;
125  for (it = m_connections.begin(); it != m_connections.end(); ++it) {
126  m_server.send(*it,a.msg);
127  }
128  } else {
129  // undefined.
130  }
131  }
132  }
133 private:
134  typedef std::set<connection_hdl,std::owner_less<connection_hdl> > con_list;
135 
136  server m_server;
137  con_list m_connections;
138  std::queue<action> m_actions;
139 
140  mutex m_action_lock;
141  mutex m_connection_lock;
142  condition_variable m_action_cond;
143 };
144 
145 int main() {
146  try {
147  broadcast_server server_instance;
148 
149  // Start a thread to run the processing loop
150  thread t(bind(&broadcast_server::process_messages,&server_instance));
151 
152  // Run the asio loop with the main thread
153  server_instance.run(9002);
154 
155  t.join();
156 
157  } catch (websocketpp::exception const & e) {
158  std::cout << e.what() << std::endl;
159  }
160 }
void on_close(connection_hdl hdl)
int main()
action(action_type t, connection_hdl h, server::message_ptr m)
connection_type::message_ptr message_ptr
Type of message pointers that this endpoint uses.
Definition: endpoint.hpp:70
lib::weak_ptr< void > connection_hdl
A handle to uniquely identify a connection.
void on_message(connection_hdl hdl, server::message_ptr msg)
websocketpp::connection_hdl hdl
void on_open(connection_hdl hdl)
action_type
action(action_type t, connection_hdl h)
server::message_ptr msg
websocketpp::server< websocketpp::config::asio > server
void run(uint16_t port)
action_type type
virtual char const * what() const
Definition: error.hpp:263