socket.h 8.1 KB
Newer Older
Y
youngwolf 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * socket.h
 *
 *  Created on: 2012-3-2
 *      Author: youngwolf
 *		email: mail2tao@163.com
 *		QQ: 676218192
 *		Community on QQ: 198941541
 *
 * UDP socket
 */

#ifndef _ASCS_UDP_SOCKET_H_
#define _ASCS_UDP_SOCKET_H_

#include "../socket.h"

namespace ascs { namespace udp {

Y
youngwolf 已提交
20 21 22 23
template <typename Packer, typename Unpacker, typename Socket = asio::ip::udp::socket,
	template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
	template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class socket_base : public socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>, InQueue, InContainer, OutQueue, OutContainer>
Y
youngwolf 已提交
24 25
{
protected:
Y
youngwolf 已提交
26
	typedef socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>, InQueue, InContainer, OutQueue, OutContainer> super;
Y
youngwolf 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49

public:
	typedef udp_msg<typename Packer::msg_type> in_msg_type;
	typedef const in_msg_type in_msg_ctype;
	typedef udp_msg<typename Unpacker::msg_type> out_msg_type;
	typedef const out_msg_type out_msg_ctype;

public:
	using super::TIMER_BEGIN;
	using super::TIMER_END;

	socket_base(asio::io_service& io_service_) : super(io_service_), unpacker_(std::make_shared<Unpacker>()) {}

	//reset all, be ensure that there's no any operations performed on this udp::socket when invoke it
	//please note, when reuse this udp::socket, object_pool will invoke reset(), child must re-write this to initialize
	//all member variables, and then do not forget to invoke udp::socket::reset() to initialize father's
	//member variables
	virtual void reset()
	{
		reset_state();
		super::reset();

		asio::error_code ec;
W
wolf 已提交
50
		this->lowest_layer().open(local_addr.protocol(), ec); assert(!ec);
Y
youngwolf 已提交
51
#ifndef ASCS_NOT_REUSE_ADDRESS
W
wolf 已提交
52
		this->lowest_layer().set_option(asio::socket_base::reuse_address(true), ec); assert(!ec);
Y
youngwolf 已提交
53
#endif
W
wolf 已提交
54
		this->lowest_layer().bind(local_addr, ec); assert(!ec);
Y
youngwolf 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
		if (ec)
			unified_out::error_out("bind failed.");
	}

	void reset_state()
	{
		unpacker_->reset_state();
		super::reset_state();
	}

	bool set_local_addr(unsigned short port, const std::string& ip = std::string())
	{
		if (ip.empty())
			local_addr = asio::ip::udp::endpoint(ASCS_UDP_DEFAULT_IP_VERSION, port);
		else
		{
			asio::error_code ec;
			auto addr = asio::ip::address::from_string(ip, ec);
			if (ec)
				return false;

			local_addr = asio::ip::udp::endpoint(addr, port);
		}

		return true;
	}
	const asio::ip::udp::endpoint& get_local_addr() const {return local_addr;}

Y
youngwolf 已提交
83 84 85
	void disconnect() {force_shutdown();}
	void force_shutdown() {show_info("link:", "been shut down."); shutdown();}
	void graceful_shutdown() {force_shutdown();}
Y
youngwolf 已提交
86 87 88 89

	//get or change the unpacker at runtime
	//changing unpacker at runtime is not thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
	//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
W
wolf 已提交
90 91 92
	std::shared_ptr<i_unpacker<typename Unpacker::msg_type>> inner_unpacker() {return unpacker_;}
	std::shared_ptr<const i_unpacker<typename Unpacker::msg_type>> inner_unpacker() const {return unpacker_;}
	void inner_unpacker(const std::shared_ptr<i_unpacker<typename Unpacker::msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
Y
youngwolf 已提交
93 94 95 96 97 98 99

	using super::send_msg;
	///////////////////////////////////////////////////
	//msg sending interface
	UDP_SEND_MSG(send_msg, false) //use the packer with native = false to pack the msgs
	UDP_SEND_MSG(send_native_msg, true) //use the packer with native = true to pack the msgs
	//guarantee send msg successfully even if can_overflow equal to false
100
	//success at here just means put the msg into udp::socket_base's send buffer
Y
youngwolf 已提交
101 102 103 104 105
	UDP_SAFE_SEND_MSG(safe_send_msg, send_msg)
	UDP_SAFE_SEND_MSG(safe_send_native_msg, send_native_msg)
	//msg sending interface
	///////////////////////////////////////////////////

106
	void show_info(const char* head, const char* tail) const {unified_out::info_out("%s %s:%hu %s", head, local_addr.address().to_string().data(), local_addr.port(), tail);}
Y
youngwolf 已提交
107 108 109 110

protected:
	virtual bool do_start()
	{
W
wolf 已提交
111
		if (!this->stopped())
Y
youngwolf 已提交
112 113 114 115 116 117 118 119
		{
			do_recv_msg();
			return true;
		}

		return false;
	}

120 121
	//ascs::socket will guarantee not call this function in more than one thread concurrently.
	//return false if send buffer is empty or sending not allowed or io_service stopped
Y
youngwolf 已提交
122 123
	virtual bool do_send_msg()
	{
124
		if (is_send_allowed() && !this->stopped() && !this->send_msg_buffer.empty() && this->send_msg_buffer.try_dequeue(last_send_msg))
Y
youngwolf 已提交
125
		{
Y
youngwolf 已提交
126
			this->stat.send_delay_sum += statistic::now() - last_send_msg.begin_time;
Y
youngwolf 已提交
127 128

			last_send_msg.restart();
W
wolf 已提交
129 130 131
			std::shared_lock<std::shared_mutex> lock(shutdown_mutex);
			this->next_layer().async_send_to(asio::buffer(last_send_msg.data(), last_send_msg.size()), last_send_msg.peer_addr,
				this->make_handler_error_size([this](const auto& ec, auto bytes_transferred) {this->send_handler(ec, bytes_transferred);}));
132 133

			return true;
Y
youngwolf 已提交
134 135
		}

136
		return false;
Y
youngwolf 已提交
137 138 139 140 141 142 143
	}

	virtual void do_recv_msg()
	{
		auto recv_buff = unpacker_->prepare_next_recv();
		assert(asio::buffer_size(recv_buff) > 0);

Y
youngwolf 已提交
144
		std::shared_lock<std::shared_mutex> lock(shutdown_mutex);
W
wolf 已提交
145 146
		this->next_layer().async_receive_from(recv_buff, peer_addr,
			this->make_handler_error_size([this](const auto& ec, auto bytes_transferred) {this->recv_handler(ec, bytes_transferred);}));
Y
youngwolf 已提交
147 148
	}

W
wolf 已提交
149
	virtual bool is_send_allowed() {return this->lowest_layer().is_open() && super::is_send_allowed();}
Y
youngwolf 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163
	//can send data or not(just put into send buffer)

	virtual void on_recv_error(const asio::error_code& ec)
	{
		if (asio::error::operation_aborted != ec)
			unified_out::error_out("recv msg error (%d %s)", ec.value(), ec.message().data());
	}

#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
	virtual bool on_msg(out_msg_type& msg) {unified_out::debug_out("recv(" ASCS_SF "): %s", msg.size(), msg.data()); return true;}
#endif

	virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {unified_out::debug_out("recv(" ASCS_SF "): %s", msg.size(), msg.data()); return true;}

Y
youngwolf 已提交
164
	void shutdown()
Y
youngwolf 已提交
165
	{
W
wolf 已提交
166 167 168 169 170
		std::unique_lock<std::shared_mutex> lock(shutdown_mutex);

		this->stop_all_timer();
		this->close(); //must after stop_all_timer(), it's very important
		this->started_ = false;
Y
youngwolf 已提交
171
//		reset_state();
Y
youngwolf 已提交
172

W
wolf 已提交
173
		if (this->lowest_layer().is_open())
Y
youngwolf 已提交
174 175
		{
			asio::error_code ec;
W
wolf 已提交
176 177
			this->lowest_layer().shutdown(asio::ip::udp::socket::shutdown_both, ec);
			this->lowest_layer().close(ec);
Y
youngwolf 已提交
178 179 180 181 182 183 184 185
		}
	}

private:
	void recv_handler(const asio::error_code& ec, size_t bytes_transferred)
	{
		if (!ec && bytes_transferred > 0)
		{
W
wolf 已提交
186 187 188 189
			++this->stat.recv_msg_sum;
			this->stat.recv_byte_sum += bytes_transferred;
			this->temp_msg_buffer.resize(this->temp_msg_buffer.size() + 1);
			this->temp_msg_buffer.back().swap(peer_addr, unpacker_->parse_msg(bytes_transferred));
190
			this->handle_msg();
Y
youngwolf 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
		}
#ifdef _MSC_VER
		else if (asio::error::connection_refused == ec || asio::error::connection_reset == ec)
			do_start();
#endif
		else
			on_recv_error(ec);
	}

	void send_handler(const asio::error_code& ec, size_t bytes_transferred)
	{
		if (!ec)
		{
			assert(bytes_transferred == last_send_msg.size());

Y
youngwolf 已提交
206
			this->stat.send_time_sum += statistic::now() - last_send_msg.begin_time;
W
wolf 已提交
207 208
			this->stat.send_byte_sum += bytes_transferred;
			++this->stat.send_msg_sum;
Y
youngwolf 已提交
209
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
W
wolf 已提交
210
			this->on_msg_send(last_send_msg);
211 212 213 214
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
			if (this->send_msg_buffer.empty())
				this->on_all_msg_send(last_send_msg);
Y
youngwolf 已提交
215 216 217
#endif
		}
		else
W
wolf 已提交
218
			this->on_send_error(ec);
Y
youngwolf 已提交
219 220
		last_send_msg.clear();

221 222 223
		//send msg sequentially, which means second sending only after first sending success
		//on windows, sending a msg to addr_any may cause errors, please note
		//for UDP, sending error will not stop subsequence sendings.
Y
youngwolf 已提交
224
		if (!do_send_msg())
225 226
		{
			this->sending = false;
Y
youngwolf 已提交
227 228
			if (!this->send_msg_buffer.empty())
				this->send_msg(); //just make sure no pending msgs
229
		}
Y
youngwolf 已提交
230 231 232 233
	}

protected:
	typename super::in_msg last_send_msg;
W
wolf 已提交
234
	std::shared_ptr<i_unpacker<typename Unpacker::msg_type>> unpacker_;
Y
youngwolf 已提交
235 236
	asio::ip::udp::endpoint peer_addr, local_addr;

Y
youngwolf 已提交
237
	std::shared_mutex shutdown_mutex;
Y
youngwolf 已提交
238 239 240 241 242
};

}} //namespace

#endif /* _ASCS_UDP_SOCKET_H_ */