st_asio_wrapper_udp_socket.h 7.6 KB
Newer Older
1 2 3 4 5
/*
 * st_asio_wrapper_udp_socket.h
 *
 *  Created on: 2012-3-2
 *      Author: youngwolf
6 7 8
 *		email: mail2tao@163.com
 *		QQ: 676218192
 *		Community on QQ: 198941541
9 10 11 12 13 14 15 16 17
 *
 * this class used at both client and server endpoint
 */

#ifndef ST_ASIO_WRAPPER_UDP_SOCKET_H_
#define ST_ASIO_WRAPPER_UDP_SOCKET_H_

#include <boost/array.hpp>

Y
youngwolf 已提交
18
#include "st_asio_wrapper_socket.h"
19 20 21

using namespace boost::asio::ip;

Y
youngwolf 已提交
22 23 24
//in set_local_addr, if the ip is empty, UDP_DEFAULT_IP_VERSION will define the ip version,
//or, the ip version will be deduced by the ip address.
//udp::v4() means ipv4 and udp::v6() means ipv6.
25 26 27 28 29 30
#ifndef UDP_DEFAULT_IP_VERSION
#define UDP_DEFAULT_IP_VERSION udp::v4()
#endif

namespace st_asio_wrapper
{
Y
youngwolf 已提交
31 32
namespace st_udp
{
33

Y
youngwolf 已提交
34
struct udp_msg
35
{
Y
youngwolf 已提交
36 37
	udp::endpoint peer_addr;
	std::string str;
38

Y
youngwolf 已提交
39 40 41 42 43 44 45
	void swap(udp_msg& other) {std::swap(peer_addr, other.peer_addr); str.swap(other.str);}
	void swap(const udp::endpoint& addr, std::string& tmp_str) {peer_addr = addr; str.swap(tmp_str);}
	void clear() {peer_addr = udp::endpoint(); str.clear();}
	bool operator==(const udp_msg& other) const {return this == &other;}
};
typedef udp_msg msg_type;
typedef const msg_type msg_ctype;
46

Y
youngwolf 已提交
47 48
class st_udp_socket : public st_socket<udp_msg, udp::socket>
{
49
public:
Y
youngwolf 已提交
50
	st_udp_socket(io_service& io_service_) : st_socket(io_service_) {reset_state();}
51 52 53 54 55 56 57 58 59 60 61 62 63

	void set_local_addr(unsigned short port, const std::string& ip = std::string())
	{
		error_code ec;
		if (ip.empty())
			local_addr = udp::endpoint(UDP_DEFAULT_IP_VERSION, port);
		else
		{
			local_addr = udp::endpoint(address::from_string(ip, ec), port);
			assert(!ec);
		}
	}

Y
youngwolf 已提交
64
	virtual void start()
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
	{
		if (!get_io_service().stopped())
			async_receive_from(buffer(raw_buff), peer_addr,
				boost::bind(&st_udp_socket::recv_handler, this, placeholders::error, placeholders::bytes_transferred));
	}

	//reset all, be ensure that there's no any operations performed on this st_udp_socket when invoke it
	virtual void reset()
	{
		reset_state();
		clear_buffer();

		error_code ec;
		close(ec);
		open(local_addr.protocol(), ec); assert(!ec);
#ifndef NOT_REUSE_ADDRESS
		set_option(socket_base::reuse_address(true), ec); assert(!ec);
#endif
		bind(local_addr, ec); assert(!ec);
		if (ec) {unified_out::error_out("bind failed.");}
	}

	void disconnect() {force_close();}
	void force_close() {clean_up();}
	void graceful_close() {clean_up();}

	//udp does not need a unpacker

	///////////////////////////////////////////////////
	//msg sending interface
Y
youngwolf 已提交
95 96 97 98 99 100
	UDP_SEND_MSG(send_msg, false) //use the packer with native = false to pack the msgs
	UDP_SEND_MSG(send_native_msg, true) //use the packer with native = true to pack the msgs
	//guarantee send msg successfully even if can_overflow equal to false
	//success at here just means put the msg into st_udp_socket's send buffer
	UDP_SAFE_SEND_MSG(safe_send_msg, send_msg)
	UDP_SAFE_SEND_MSG(safe_send_native_msg, send_native_msg)
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
	//msg sending interface
	///////////////////////////////////////////////////

	//don't use the packer but insert into the send_msg_buffer directly
	bool direct_send_msg(const udp::endpoint& peer_addr, const std::string& str, bool can_overflow = false)
		{return direct_send_msg(peer_addr, std::string(str), can_overflow);}
	bool direct_send_msg(const udp::endpoint& peer_addr, std::string&& str, bool can_overflow = false)
	{
		mutex::scoped_lock lock(send_msg_buffer_mutex);
		if (can_overflow || send_msg_buffer.size() < MAX_MSG_NUM)
			return direct_insert_msg(peer_addr, std::move(str));

		return false;
	}

	//send buffered msgs, return false if send buffer is empty or invalidate status
	bool send_msg()
	{
		mutex::scoped_lock lock(send_msg_buffer_mutex);
		return do_send_msg();
	}

protected:
Y
youngwolf 已提交
124 125
	virtual void on_recv_error(const error_code& ec)
		{unified_out::error_out("recv msg error: %d %s", ec.value(), ec.message().data());}
126 127 128 129 130

#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
	//if you want to use your own recv buffer, you can move the msg to your own recv buffer,
	//and return false, then, handle the msg as your own strategy(may be you'll need a msg dispatch thread)
	//or, you can handle the msg at here and return false, but this will reduce efficiency(
Y
youngwolf 已提交
131
	//because this msg handling block the next msg receiving on the same st_tcp_socket) unless you can
132 133 134 135 136 137
	//handle the msg very fast(which will inversely more efficient, because msg recv buffer and msg dispatching
	//are not needed any more).
	//
	//return true means use the msg recv buffer, you must handle the msgs in on_msg_handle()
	//notice: on_msg_handle() will not be invoked from within this function
	//
138
	//notice: using inconstant is for the convenience of swapping
139 140 141 142 143 144
	virtual bool on_msg(msg_type& msg)
		{unified_out::debug_out("recv(" size_t_format "): %s", msg.str.size(), msg.str.data()); return false;}
#endif

	//handling msg at here will not block msg receiving
	//if on_msg() return false, this function will not be invoked due to no msgs need to dispatch
145
	//notice: using inconstant is for the convenience of swapping
146 147 148 149 150 151 152 153 154 155 156 157 158 159
	virtual void on_msg_handle(msg_type& msg)
		{unified_out::debug_out("recv(" size_t_format "): %s", msg.str.size(), msg.str.data());}

#ifdef WANT_MSG_SEND_NOTIFY
	//one msg has sent to the kernel buffer
	virtual void on_msg_send(msg_type& msg) {}
#endif
#ifdef WANT_ALL_MSG_SEND_NOTIFY
	//send buffer goes empty
	virtual void on_all_msg_send(msg_type& msg) {}
#endif

	void clean_up()
	{
Y
youngwolf 已提交
160
		if (is_open())
161
		{
Y
youngwolf 已提交
162 163 164
			error_code ec;
			shutdown(udp::socket::shutdown_both, ec);
			close(ec);
165 166
		}

Y
youngwolf 已提交
167 168
		stop_all_timer();
		reset_state();
169 170 171 172 173 174 175
	}

	void recv_handler(const error_code& ec, size_t bytes_transferred)
	{
		if (!ec && bytes_transferred > 0)
		{
			std::string tmp_str(raw_buff.data(), bytes_transferred);
Y
youngwolf 已提交
176
			temp_msg_buffer.resize(temp_msg_buffer.size() + 1);
177
			temp_msg_buffer.back().swap(peer_addr, tmp_str);
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
			auto all_dispatched = dispatch_msg();

			if (all_dispatched)
				start(); //recv msg sequentially, that means second recv only after first recv success
			else
				set_timer(0, 50, nullptr);
		}
#ifdef _MSC_VER
		else if (WSAECONNREFUSED == ec.value())
			start();
#endif
		else
			on_recv_error(ec);
	}

	void send_handler(const error_code& ec, size_t bytes_transferred)
	{
		if (!ec)
		{
			assert(bytes_transferred > 0);
#ifdef WANT_MSG_SEND_NOTIFY
			on_msg_send(last_send_msg);
#endif
		}
		else
			on_send_error(ec);
		//under windows, send a msg to addr_any may cause sending errors, please note
		//for udp in st_asio_wrapper, sending error will not stop the following sending.

		mutex::scoped_lock lock(send_msg_buffer_mutex);
		sending = false;
Y
youngwolf 已提交
209

210 211
		//send msg sequentially, that means second send only after first send success
		if (!do_send_msg())
Y
youngwolf 已提交
212
		{
213
#ifdef WANT_ALL_MSG_SEND_NOTIFY
Y
youngwolf 已提交
214 215
			lock.unlock();
			on_all_msg_send(last_send_msg);
216
#endif
Y
youngwolf 已提交
217
		}
218 219 220 221 222
	}

	//must mutex send_msg_buffer before invoke this function
	bool do_send_msg()
	{
Y
youngwolf 已提交
223
		if (!is_send_allowed() || get_io_service().stopped())
224 225 226 227
			sending = false;
		else if (!sending && !send_msg_buffer.empty())
		{
			sending = true;
228
			last_send_msg.swap(send_msg_buffer.front());
229 230
			async_send_to(buffer(last_send_msg.str), last_send_msg.peer_addr,
				boost::bind(&st_udp_socket::send_handler, this, placeholders::error, placeholders::bytes_transferred));
231 232 233 234 235 236 237 238 239 240 241
			send_msg_buffer.pop_front();
		}

		return sending;
	}

	//must mutex send_msg_buffer before invoke this function
	bool direct_insert_msg(const udp::endpoint& peer_addr, std::string&& str)
	{
		if (!str.empty())
		{
Y
youngwolf 已提交
242
			send_msg_buffer.resize(send_msg_buffer.size() + 1);
243
			send_msg_buffer.back().swap(peer_addr, str);
244 245 246
			do_send_msg();
		}

Y
youngwolf 已提交
247
		return true;
248 249 250 251
	}

protected:
	array<char, MAX_MSG_LEN> raw_buff;
Y
youngwolf 已提交
252
	udp::endpoint peer_addr, local_addr;
253 254
};

Y
youngwolf 已提交
255 256 257 258
} //namespace st_udp
} //namespace st_asio_wrapper

using namespace st_asio_wrapper::st_udp;
259 260

#endif /* ST_ASIO_WRAPPER_UDP_SOCKET_H_ */