st_asio_wrapper_udp_socket.h 9.0 KB
Newer Older
1 2 3 4 5
/*
 * st_asio_wrapper_udp_socket.h
 *
 *  Created on: 2012-3-2
 *      Author: youngwolf
6 7 8
 *		email: mail2tao@163.com
 *		QQ: 676218192
 *		Community on QQ: 198941541
9 10 11 12 13 14 15 16 17
 *
 * this class used at both client and server endpoint
 */

#ifndef ST_ASIO_WRAPPER_UDP_SOCKET_H_
#define ST_ASIO_WRAPPER_UDP_SOCKET_H_

#include <boost/array.hpp>

Y
youngwolf 已提交
18
#include "st_asio_wrapper_socket.h"
Y
youngwolf 已提交
19
#include "st_asio_wrapper_unpacker.h"
20

Y
youngwolf 已提交
21
//in set_local_addr, if the IP is empty, ST_ASIO_UDP_DEFAULT_IP_VERSION will define the IP version,
Y
youngowlf 已提交
22
//or, the IP version will be deduced by the IP address.
Y
youngwolf 已提交
23
//boost::asio::ip::udp::v4() means ipv4 and boost::asio::ip::udp::v6() means ipv6.
Y
youngwolf 已提交
24 25
#ifndef ST_ASIO_UDP_DEFAULT_IP_VERSION
#define ST_ASIO_UDP_DEFAULT_IP_VERSION boost::asio::ip::udp::v4()
26 27
#endif

Y
youngwolf 已提交
28 29
#ifndef ST_ASIO_DEFAULT_UDP_UNPACKER
#define ST_ASIO_DEFAULT_UDP_UNPACKER udp_unpacker
Y
youngwolf 已提交
30 31
#endif

32 33
namespace st_asio_wrapper
{
Y
youngwolf 已提交
34 35
namespace st_udp
{
36

Y
youngwolf 已提交
37
template <typename Packer = ST_ASIO_DEFAULT_PACKER, typename Unpacker = ST_ASIO_DEFAULT_UDP_UNPACKER, typename Socket = boost::asio::ip::udp::socket>
38
class st_udp_socket_base : public st_socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>>
Y
youngwolf 已提交
39
{
40
public:
41 42 43 44
	typedef udp_msg<typename Packer::msg_type> in_msg_type;
	typedef const in_msg_type in_msg_ctype;
	typedef udp_msg<typename Unpacker::msg_type> out_msg_type;
	typedef const out_msg_type out_msg_ctype;
Y
youngwolf 已提交
45 46

public:
47
	using st_socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>>::TIMER_BEGIN;
Y
youngwolf 已提交
48
	using st_socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>>::TIMER_END;
49

50 51
	st_udp_socket_base(boost::asio::io_service& io_service_) : st_socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type>(io_service_), unpacker_(boost::make_shared<Unpacker>())
		{ST_THIS reset_state();}
52 53

	//reset all, be ensure that there's no any operations performed on this st_udp_socket when invoke it
54
	//please note, when reuse this st_udp_socket, st_object_pool will invoke reset(), child must re-write this to initialize
Y
youngowlf 已提交
55
	//all member variables, and then do not forget to invoke st_udp_socket::reset() to initialize father's
56
	//member variables
57 58
	virtual void reset()
	{
Y
youngwolf 已提交
59 60
		ST_THIS reset_state();
		ST_THIS clear_buffer();
61

Y
youngwolf 已提交
62
		boost::system::error_code ec;
Y
youngwolf 已提交
63 64
		ST_THIS lowest_layer().close(ec);
		ST_THIS lowest_layer().open(local_addr.protocol(), ec); assert(!ec);
Y
youngwolf 已提交
65
#ifndef ST_ASIO_NOT_REUSE_ADDRESS
Y
youngwolf 已提交
66
		ST_THIS lowest_layer().set_option(boost::asio::socket_base::reuse_address(true), ec); assert(!ec);
67
#endif
Y
youngwolf 已提交
68
		ST_THIS lowest_layer().bind(local_addr, ec); assert(!ec);
69 70
		if (ec)
			unified_out::error_out("bind failed.");
71 72
	}

73
	bool set_local_addr(unsigned short port, const std::string& ip = std::string())
74
	{
75
		if (ip.empty())
Y
youngwolf 已提交
76
			local_addr = boost::asio::ip::udp::endpoint(ST_ASIO_UDP_DEFAULT_IP_VERSION, port);
77 78
		else
		{
79 80 81 82 83 84
			boost::system::error_code ec;
			auto addr = boost::asio::ip::address::from_string(ip, ec);
			if (ec)
				return false;

			local_addr = boost::asio::ip::udp::endpoint(addr, port);
85
		}
86 87

		return true;
88
	}
Y
youngwolf 已提交
89
	const boost::asio::ip::udp::endpoint& get_local_addr() const {return local_addr;}
90

91
	void disconnect() {force_close();}
92 93
	void force_close() {show_info("link:", "been closed."); clean_up();}
	void graceful_close() {force_close();}
94

Y
youngwolf 已提交
95
	//get or change the unpacker at runtime
96
	//changing unpacker at runtime is not thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
97
	//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
98 99 100
	boost::shared_ptr<i_udp_unpacker<typename Packer::msg_type>> inner_unpacker() {return unpacker_;}
	boost::shared_ptr<const i_udp_unpacker<typename Packer::msg_type>> inner_unpacker() const {return unpacker_;}
	void inner_unpacker(const boost::shared_ptr<i_udp_unpacker<typename Packer::msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
101

102
	using st_socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type>::send_msg;
103 104
	///////////////////////////////////////////////////
	//msg sending interface
Y
youngwolf 已提交
105 106 107 108 109 110
	UDP_SEND_MSG(send_msg, false) //use the packer with native = false to pack the msgs
	UDP_SEND_MSG(send_native_msg, true) //use the packer with native = true to pack the msgs
	//guarantee send msg successfully even if can_overflow equal to false
	//success at here just means put the msg into st_udp_socket's send buffer
	UDP_SAFE_SEND_MSG(safe_send_msg, send_msg)
	UDP_SAFE_SEND_MSG(safe_send_native_msg, send_native_msg)
Y
youngwolf 已提交
111 112 113
	//like safe_send_msg and safe_send_native_msg, but non-block
	UDP_POST_MSG(post_msg, false)
	UDP_POST_MSG(post_native_msg, true)
114 115 116
	//msg sending interface
	///////////////////////////////////////////////////

117
	void show_info(const char* head, const char* tail) const {unified_out::info_out("%s %s:%hu %s", head, local_addr.address().to_string().c_str(), local_addr.port(), tail);}
118

119
protected:
120 121
	virtual bool do_start()
	{
Y
youngwolf 已提交
122
		if (!ST_THIS get_io_service().stopped())
123
		{
124
			do_recv_msg();
125 126 127 128 129 130 131 132 133
			return true;
		}

		return false;
	}

	//must mutex send_msg_buffer before invoke this function
	virtual bool do_send_msg()
	{
Y
youngwolf 已提交
134 135
		if (!is_send_allowed() || ST_THIS get_io_service().stopped())
			ST_THIS sending = false;
136
		else if (!ST_THIS sending && !ST_THIS send_msg_buffer.empty())
137
		{
Y
youngwolf 已提交
138
			ST_THIS sending = true;
139
			ST_THIS last_send_msg.swap(ST_THIS send_msg_buffer.front());
Y
youngwolf 已提交
140
			ST_THIS next_layer().async_send_to(boost::asio::buffer(ST_THIS last_send_msg.data(), ST_THIS last_send_msg.size()), ST_THIS last_send_msg.peer_addr,
141
				boost::bind(&st_udp_socket_base::send_handler, this,
Y
youngwolf 已提交
142
#ifdef ST_ASIO_ENHANCED_STABILITY
143 144 145
					ST_THIS async_call_indicator,
#endif
					boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
146
			ST_THIS send_msg_buffer.pop_front();
147 148
		}

Y
youngwolf 已提交
149
		return ST_THIS sending;
150 151
	}

152 153 154 155 156 157 158 159 160 161
	virtual void do_recv_msg()
	{
		ST_THIS next_layer().async_receive_from(unpacker_->prepare_next_recv(), peer_addr,
			boost::bind(&st_udp_socket_base::recv_handler, this,
#ifdef ST_ASIO_ENHANCED_STABILITY
				ST_THIS async_call_indicator,
#endif
				boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
	}

162
	virtual bool is_send_allowed() const {return ST_THIS lowest_layer().is_open() && st_socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type>::is_send_allowed();}
163 164
	//can send data or not(just put into send buffer)

Y
youngwolf 已提交
165
	virtual void on_recv_error(const boost::system::error_code& ec)
166 167
	{
		if (boost::asio::error::operation_aborted != ec)
168
			unified_out::error_out("recv msg error (%d %s)", ec.value(), ec.message().data());
169
	}
170

Y
youngwolf 已提交
171 172
#ifndef ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
	virtual bool on_msg(out_msg_type& msg) {unified_out::debug_out("recv(" ST_ASIO_SF "): %s", msg.size(), msg.data()); return true;}
173 174
#endif

Y
youngwolf 已提交
175
	virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {unified_out::debug_out("recv(" ST_ASIO_SF "): %s", msg.size(), msg.data()); return true;}
176 177 178

	void clean_up()
	{
179 180 181
		ST_THIS stop_all_timer();
		ST_THIS reset_state();

Y
youngwolf 已提交
182
		if (ST_THIS lowest_layer().is_open())
183
		{
Y
youngwolf 已提交
184
			boost::system::error_code ec;
Y
youngwolf 已提交
185 186
			ST_THIS lowest_layer().shutdown(boost::asio::ip::udp::socket::shutdown_both, ec);
			ST_THIS lowest_layer().close(ec);
187 188 189
		}
	}

Y
youngwolf 已提交
190
private:
191
	void recv_handler(
Y
youngwolf 已提交
192
#ifdef ST_ASIO_ENHANCED_STABILITY
193
		const boost::shared_ptr<char>& async_call_indicator,
194 195
#endif
		const boost::system::error_code& ec, size_t bytes_transferred)
196 197 198
	{
		if (!ec && bytes_transferred > 0)
		{
199 200
			ST_THIS temp_msg_buffer.resize(ST_THIS temp_msg_buffer.size() + 1);
			ST_THIS temp_msg_buffer.back().swap(peer_addr, unpacker_->parse_msg(bytes_transferred));
Y
youngwolf 已提交
201
			ST_THIS dispatch_msg();
202 203
		}
#ifdef _MSC_VER
204
		else if (boost::asio::error::connection_refused == ec || boost::asio::error::connection_reset == ec)
205
			do_start();
206 207 208 209 210
#endif
		else
			on_recv_error(ec);
	}

211
	void send_handler(
Y
youngwolf 已提交
212
#ifdef ST_ASIO_ENHANCED_STABILITY
213
		const boost::shared_ptr<char>& async_call_indicator,
214 215
#endif
		const boost::system::error_code& ec, size_t bytes_transferred)
216 217 218 219
	{
		if (!ec)
		{
			assert(bytes_transferred > 0);
Y
youngwolf 已提交
220
#ifdef ST_ASIO_WANT_MSG_SEND_NOTIFY
221
			ST_THIS on_msg_send(ST_THIS last_send_msg);
222 223 224
#endif
		}
		else
Y
youngwolf 已提交
225
			ST_THIS on_send_error(ec);
226

227
		boost::unique_lock<boost::shared_mutex> lock(ST_THIS send_msg_buffer_mutex);
Y
youngwolf 已提交
228
		ST_THIS sending = false;
Y
youngwolf 已提交
229

230
		//send msg sequentially, that means second send only after first send success
231
		//under windows, send a msg to addr_any may cause sending errors, please note
Y
youngowlf 已提交
232
		//for UDP in st_asio_wrapper, sending error will not stop the following sending.
Y
youngwolf 已提交
233
#ifdef ST_ASIO_WANT_ALL_MSG_SEND_NOTIFY
234
		if (!do_send_msg())
235
			ST_THIS on_all_msg_send(ST_THIS last_send_msg);
236 237
#else
		do_send_msg();
238
#endif
239 240 241

		if (!ST_THIS sending)
			ST_THIS last_send_msg.clear();
242 243 244
	}

protected:
245
	boost::shared_ptr<i_udp_unpacker<typename Packer::msg_type>> unpacker_;
Y
youngwolf 已提交
246
	boost::asio::ip::udp::endpoint peer_addr, local_addr;
247
};
Y
youngwolf 已提交
248
typedef st_udp_socket_base<> st_udp_socket;
249

Y
youngwolf 已提交
250 251 252
} //namespace st_udp
} //namespace st_asio_wrapper

253
using namespace st_asio_wrapper::st_udp; //compatible with old version which doesn't have st_udp namespace.
254 255

#endif /* ST_ASIO_WRAPPER_UDP_SOCKET_H_ */