echo_server.cpp 13.2 KB
Newer Older
Y
youngwolf 已提交
1 2 3 4 5 6

#include <iostream>

//configuration
#define ASCS_SERVER_PORT		9527
#define ASCS_REUSE_OBJECT //use objects pool
Y
youngwolf 已提交
7
//#define ASCS_FREE_OBJECT_INTERVAL	60 //it's useless if ASCS_REUSE_OBJECT macro been defined
Y
YangLi 已提交
8
//#define ASCS_SYNC_DISPATCH //do not open this feature, see below for more details
Y
youngwolf 已提交
9
#define ASCS_DISPATCH_BATCH_MSG
10
//#define ASCS_FULL_STATISTIC //full statistic will slightly impact efficiency
Y
YangLi 已提交
11
#define ASCS_USE_STEADY_TIMER
Y
youngwolf 已提交
12
#define ASCS_ALIGNED_TIMER
Y
youngwolf 已提交
13 14
#define ASCS_AVOID_AUTO_STOP_SERVICE
#define ASCS_DECREASE_THREAD_AT_RUNTIME
15 16 17 18 19
//#define ASCS_MAX_SEND_BUF	65536
//#define ASCS_MAX_RECV_BUF	65536
//if there's a huge number of links, please reduce messge buffer via ASCS_MAX_SEND_BUF and ASCS_MAX_RECV_BUF macro.
//please think about if we have 512 links, how much memory we can accupy at most with default ASCS_MAX_SEND_BUF and ASCS_MAX_RECV_BUF?
//it's 2 * 1M * 512 = 1G
Y
youngwolf 已提交
20 21 22 23

//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE	0
//0-default packer and unpacker, head(length) + body
24
//1-packer2 and unpacker2, head(length) + body
W
wolf 已提交
25 26
//2-fixed length packer and unpacker
//3-prefix and/or suffix packer and unpacker
Y
youngwolf 已提交
27

Y
youngwolf 已提交
28 29 30 31 32
#if 0 == PACKER_UNPACKER_TYPE
#define ASCS_HUGE_MSG
#define ASCS_MSG_BUFFER_SIZE 1000000
#define ASCS_MAX_SEND_BUF (10 * ASCS_MSG_BUFFER_SIZE)
#define ASCS_MAX_RECV_BUF (10 * ASCS_MSG_BUFFER_SIZE)
Y
youngwolf 已提交
33
#define ASCS_DEFAULT_UNPACKER flexible_unpacker<std::string>
Y
youngwolf 已提交
34 35 36
//this unpacker only pre-allocated a buffer of 4000 bytes, but it can parse messages up to ST_ASIO_MSG_BUFFER_SIZE (here is 1000000) bytes,
//it works as the default unpacker for messages <= 4000, otherwise, it works as non_copy_unpacker
#elif 1 == PACKER_UNPACKER_TYPE
37
#define ASCS_DEFAULT_PACKER packer2<unique_buffer<basic_buffer>, basic_buffer, packer<basic_buffer>>
Y
youngwolf 已提交
38
#define ASCS_DEFAULT_UNPACKER unpacker2<unique_buffer<basic_buffer>, basic_buffer, flexible_unpacker<>>
Y
youngwolf 已提交
39
#elif 2 == PACKER_UNPACKER_TYPE
Y
youngwolf 已提交
40 41
#undef ASCS_HEARTBEAT_INTERVAL
#define ASCS_HEARTBEAT_INTERVAL	0 //not support heartbeat
42
#define ASCS_DEFAULT_PACKER fixed_length_packer
Y
youngwolf 已提交
43 44 45 46 47 48 49
#define ASCS_DEFAULT_UNPACKER fixed_length_unpacker
#elif 3 == PACKER_UNPACKER_TYPE
#define ASCS_DEFAULT_PACKER prefix_suffix_packer
#define ASCS_DEFAULT_UNPACKER prefix_suffix_unpacker
#endif
//configuration

W
wolf 已提交
50
#include <ascs/ext/tcp.h>
Y
youngwolf 已提交
51
using namespace ascs;
W
wolf 已提交
52
using namespace ascs::tcp;
Y
youngwolf 已提交
53
using namespace ascs::ext;
W
wolf 已提交
54
using namespace ascs::ext::tcp;
Y
youngwolf 已提交
55 56 57

#define QUIT_COMMAND	"quit"
#define RESTART_COMMAND	"restart"
58
#define STATUS			"status"
Y
youngowlf 已提交
59 60 61 62
#define STATISTIC		"statistic"
#define LIST_ALL_CLIENT	"list all client"
#define INCREASE_THREAD	"increase thread"
#define DECREASE_THREAD	"decrease thread"
Y
youngwolf 已提交
63 64

//demonstrate how to use custom packer
Y
youngwolf 已提交
65
//under the default behavior, each tcp::socket has their own packer, and cause memory waste
Y
youngwolf 已提交
66 67 68 69
//at here, we make each echo_socket use the same global packer for memory saving
//notice: do not do this for unpacker, because unpacker has member variables and can't share each other
auto global_packer(std::make_shared<ASCS_DEFAULT_PACKER>());

Y
youngwolf 已提交
70
//demonstrate how to control the type of tcp::server_socket_base::server from template parameter
Y
youngwolf 已提交
71 72 73 74 75 76
class i_echo_server : public i_server
{
public:
	virtual void test() = 0;
};

77
class echo_socket : public server_socket2<i_echo_server>
Y
youngwolf 已提交
78
{
79 80 81
private:
	typedef server_socket2<i_echo_server> super;

Y
youngwolf 已提交
82
public:
83
	echo_socket(i_echo_server& server_) : super(server_)
Y
youngwolf 已提交
84
	{
Y
youngwolf 已提交
85
		packer(global_packer);
Y
youngwolf 已提交
86

Y
yang li 已提交
87
#if 3 == PACKER_UNPACKER_TYPE
Y
youngwolf 已提交
88
		std::dynamic_pointer_cast<ASCS_DEFAULT_UNPACKER>(unpacker())->prefix_suffix("begin", "end");
Y
youngwolf 已提交
89 90 91 92 93 94 95
#endif
	}

public:
	//because we use objects pool(REUSE_OBJECT been defined), so, strictly speaking, this virtual
	//function must be rewrote, but we don't have member variables to initialize but invoke father's
	//reset() directly, so, it can be omitted, but we keep it for possibly future using
96
	virtual void reset() {super::reset();}
Y
youngwolf 已提交
97 98 99 100

protected:
	virtual void on_recv_error(const asio::error_code& ec)
	{
Y
youngwolf 已提交
101
		//the type of tcp::server_socket_base::server now can be controlled by derived class(echo_socket),
Y
youngwolf 已提交
102
		//which is actually i_echo_server, so, we can invoke i_echo_server::test virtual function.
Y
youngwolf 已提交
103
		get_server().test();
104
		super::on_recv_error(ec);
Y
youngwolf 已提交
105 106 107
	}

	//msg handling: send the original msg back(echo server)
Y
YangLi 已提交
108
#ifdef ASCS_SYNC_DISPATCH //do not open this feature
Y
YangLi 已提交
109
	//do not hold msg_can for further using, return from on_msg as quickly as possible
Y
yang li 已提交
110
	//access msg_can freely within this callback, it's always thread safe.
Y
youngwolf 已提交
111
	virtual size_t on_msg(std::list<out_msg_type>& msg_can)
Y
YangLi 已提交
112 113
	{
		if (!is_send_buffer_available())
114
			return 0;
Y
YangLi 已提交
115
		//here if we cannot handle all messages in msg_can, do not use sync message dispatching except we can bear message disordering,
Y
YangLi 已提交
116 117
		//this is because on_msg_handle can be invoked concurrently with the next on_msg (new messages arrived) and then disorder messages.
		//and do not try to handle all messages here (just for echo_server's business logic) because:
Y
YangLi 已提交
118 119
		//1. we can not use safe_send_msg as i said many times, we should not block service threads.
		//2. if we use true can_overflow to call send_msg, then buffer usage will be out of control, we should not take this risk.
Y
YangLi 已提交
120

Y
youngwolf 已提交
121
		//following statement can avoid one memory replication if the type of out_msg_type and in_msg_type are identical.
Y
yang li 已提交
122
		ascs::do_something_to_all(msg_can, [this](out_msg_type& msg) {this->send_msg(std::move(msg), true);});
Y
YangLi 已提交
123
		msg_can.clear();
Y
YangLi 已提交
124

125
		return 1;
126 127
		//if we indeed handled some messages, do return 1
		//if we handled nothing, return 1 is also okey but will very slightly impact performance (if msg_can is not empty), return 0 is suggested
Y
YangLi 已提交
128 129
	}
#endif
Y
yang li 已提交
130

Y
youngwolf 已提交
131
#ifdef ASCS_DISPATCH_BATCH_MSG
Y
YangLi 已提交
132
	//do not hold msg_can for further using, access msg_can and return from on_msg_handle as quickly as possible
Y
yang li 已提交
133 134
	//can only access msg_can via functions that marked as 'thread safe', if you used non-lock queue, its your responsibility to guarantee
	// that new messages will not come until we returned from this callback (for example, pingpong test).
135
	virtual size_t on_msg_handle(out_queue_type& msg_can)
Y
youngwolf 已提交
136 137
	{
		if (!is_send_buffer_available())
138
			return 0;
Y
youngwolf 已提交
139 140

		out_container_type tmp_can;
Y
yang li 已提交
141 142
		msg_can.move_items_out(tmp_can, 10); //don't be too greedy, here is in a service thread, we should not block this thread for a long time

Y
youngwolf 已提交
143
		//following statement can avoid one memory replication if the type of out_msg_type and in_msg_type are identical.
Y
yang li 已提交
144
		ascs::do_something_to_all(tmp_can, [this](out_msg_type& msg) {this->send_msg(std::move(msg), true);});
145
		return 1;
Y
youngwolf 已提交
146 147
		//if we indeed handled some messages, do return 1, else, return 0
		//if we handled nothing, but want to re-dispatch messages immediately, return 1
Y
youngwolf 已提交
148 149
	}
#else
Y
youngwolf 已提交
150
	//following statement can avoid one memory replication if the type of out_msg_type and in_msg_type are identical.
Y
yang li 已提交
151
	virtual bool on_msg_handle(out_msg_type& msg) {return send_msg(std::move(msg));}
Y
youngwolf 已提交
152
#endif
Y
youngwolf 已提交
153 154 155
	//msg handling end
};

W
wolf 已提交
156
class echo_server : public server_base<echo_socket, object_pool<echo_socket>, i_echo_server>
Y
youngwolf 已提交
157 158 159 160
{
public:
	echo_server(service_pump& service_pump_) : server_base(service_pump_) {}

161
protected:
Y
youngwolf 已提交
162 163 164 165
	//from i_echo_server, pure virtual function, we must implement it.
	virtual void test() {/*puts("in echo_server::test()");*/}
};

Y
youngwolf 已提交
166
#if ASCS_HEARTBEAT_INTERVAL > 0
Y
youngwolf 已提交
167
typedef server_socket_base<packer<>, unpacker<>> normal_socket;
Y
youngwolf 已提交
168 169
#else
//demonstrate how to open heartbeat function without defining macro ASCS_HEARTBEAT_INTERVAL
Y
youngwolf 已提交
170
class normal_socket : public server_socket_base<packer<>, unpacker<>>
Y
youngwolf 已提交
171 172
{
public:
173
	normal_socket(i_server& server_) : server_socket_base<ext::packer<>, ext::unpacker<>>(server_) {}
Y
youngwolf 已提交
174
	//sometime, the default packer brings name conflict with the socket's packer member function, prefix namespace can resolve this conflict.
Y
youngwolf 已提交
175 176

protected:
Y
yang li 已提交
177
	//demo client needs heartbeat (macro ASCS_HEARTBEAT_INTERVAL been defined), please note that the interval (here is 5) must be equal to
Y
youngwolf 已提交
178 179
	//macro ASCS_HEARTBEAT_INTERVAL defined in demo client, and macro ASCS_HEARTBEAT_MAX_ABSENCE must has the same value as demo client's.
	virtual void on_connect() {start_heartbeat(5);}
Y
youngwolf 已提交
180 181 182
};
#endif

183 184 185 186 187 188 189
//demonstrate how to accept just one client at server endpoint
class normal_server : public server_base<normal_socket>
{
public:
	normal_server(service_pump& service_pump_) : server_base(service_pump_) {}

protected:
190
	virtual int async_accept_num() {return 1;}
191 192 193
	virtual bool on_accept(object_ctype& socket_ptr) {stop_listen(); return true;}
};

Y
youngwolf 已提交
194
class short_connection : public server_socket_base<packer<>, unpacker<>>
195
{
Y
youngwolf 已提交
196
private:
197
	typedef server_socket_base<ext::packer<>, ext::unpacker<>> super;
Y
youngwolf 已提交
198

199
public:
Y
youngwolf 已提交
200
	short_connection(i_server& server_) : super(server_) {}
201 202 203 204 205

protected:
	//msg handling
#ifdef ASCS_SYNC_DISPATCH
	//do not hold msg_can for further using, return from on_msg as quickly as possible
Y
yang li 已提交
206
	//access msg_can freely within this callback, it's always thread safe.
Y
youngwolf 已提交
207
	virtual size_t on_msg(std::list<out_msg_type>& msg_can) {auto re = super::on_msg(msg_can); force_shutdown(); return re;}
208 209 210 211
#endif

#ifdef ASCS_DISPATCH_BATCH_MSG
	//do not hold msg_can for further using, access msg_can and return from on_msg_handle as quickly as possible
Y
yang li 已提交
212 213
	//can only access msg_can via functions that marked as 'thread safe', if you used non-lock queue, its your responsibility to guarantee
	// that new messages will not come until we returned from this callback (for example, pingpong test).
Y
youngwolf 已提交
214
	virtual size_t on_msg_handle(out_queue_type& msg_can) {auto re = super::on_msg_handle(msg_can); force_shutdown(); return re;}
215
#else
Y
youngwolf 已提交
216
	virtual bool on_msg_handle(out_msg_type& msg) {auto re = super::on_msg_handle(msg); force_shutdown(); return re;}
217 218 219 220
#endif
	//msg handling end
};

Y
youngwolf 已提交
221 222
int main(int argc, const char* argv[])
{
223
	printf("usage: %s [<service thread number=1> [<port=%d> [ip=0.0.0.0]]]\n", argv[0], ASCS_SERVER_PORT);
Y
youngwolf 已提交
224 225 226 227 228 229 230
	puts("normal server's port will be 100 larger.");
	if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h")))
		return 0;
	else
		puts("type " QUIT_COMMAND " to end.");

	service_pump sp;
231 232 233
	echo_server echo_server_(sp); //echo server

	//demonstrate how to use singel_service
234 235
	//because of normal_socket, this server cannot support fixed_length_packer/fixed_length_unpacker and prefix_suffix_packer/prefix_suffix_unpacker,
	//the reason is these packer and unpacker need additional initializations that normal_socket not implemented, see echo_socket's constructor for more details.
236 237
	single_service_pump<normal_server> normal_server_;
	single_service_pump<server_base<short_connection>> short_server;
Y
youngwolf 已提交
238

Y
yang li 已提交
239 240 241 242
	unsigned short port = ASCS_SERVER_PORT;
	std::string ip;
	if (argc > 2)
		port = (unsigned short) atoi(argv[2]);
Y
youngwolf 已提交
243
	if (argc > 3)
Y
yang li 已提交
244 245
		ip = argv[3];

246
	normal_server_.set_server_addr(port + 100, ip);
247
	short_server.set_server_addr(port + 200, ip);
Y
yang li 已提交
248
	echo_server_.set_server_addr(port, ip);
Y
youngwolf 已提交
249 250 251 252 253 254

	auto thread_num = 1;
	if (argc > 1)
		thread_num = std::min(16, std::max(thread_num, atoi(argv[1])));

#if 3 == PACKER_UNPACKER_TYPE
Y
youngwolf 已提交
255
	global_packer->prefix_suffix("begin", "end");
Y
youngwolf 已提交
256 257 258
#endif

	sp.start_service(thread_num);
Y
yang li 已提交
259 260
	normal_server_.start_service(1);
	short_server.start_service(1);
Y
youngwolf 已提交
261 262 263
	while(sp.is_running())
	{
		std::string str;
Y
youngowlf 已提交
264
		std::getline(std::cin, str);
Y
YangLi 已提交
265 266 267
		if (str.empty())
			;
		else if (QUIT_COMMAND == str)
268
		{
Y
youngwolf 已提交
269
			sp.stop_service();
270 271 272
			normal_server_.stop_service();
			short_server.stop_service();
		}
Y
youngwolf 已提交
273 274 275 276 277
		else if (RESTART_COMMAND == str)
		{
			sp.stop_service();
			sp.start_service(thread_num);
		}
278
		else if (STATISTIC == str)
Y
youngwolf 已提交
279
		{
280
			printf("normal server, link #: " ASCS_SF ", invalid links: " ASCS_SF "\n", normal_server_.size(), normal_server_.invalid_object_size());
281
			printf("echo server, link #: " ASCS_SF ", invalid links: " ASCS_SF "\n\n", echo_server_.size(), echo_server_.invalid_object_size());
282 283 284 285
			static statistic last_stat;
			statistic this_stat = echo_server_.get_statistic();
			puts((this_stat - last_stat).to_string().data());
			last_stat = this_stat;
Y
youngwolf 已提交
286
		}
287 288
		else if (STATUS == str)
		{
289
			normal_server_.list_all_status();
290 291
			echo_server_.list_all_status();
		}
Y
youngwolf 已提交
292 293 294
		else if (LIST_ALL_CLIENT == str)
		{
			puts("clients from normal server:");
295
			normal_server_.list_all_object();
Y
youngwolf 已提交
296 297 298
			puts("clients from echo server:");
			echo_server_.list_all_object();
		}
Y
youngwolf 已提交
299 300 301 302
		else if (INCREASE_THREAD == str)
			sp.add_service_thread(1);
		else if (DECREASE_THREAD == str)
			sp.del_service_thread(1);
Y
youngwolf 已提交
303 304
		else
		{
Y
youngwolf 已提交
305 306
//			/*
			//broadcast series functions call pack_msg for each client respectively, because clients may used different protocols(so different type of packers, of course)
Y
youngwolf 已提交
307
			normal_server_.broadcast_msg(str.data(), str.size() + 1);
Y
youngwolf 已提交
308 309
			//send \0 character too, because demo client used basic_buffer as its msg type, it will not append \0 character automatically as std::string does,
			//so need \0 character when printing it.
Y
youngwolf 已提交
310
//			*/
Y
youngwolf 已提交
311
			/*
Y
youngwolf 已提交
312 313 314
			//if all clients used the same protocol, we can pack msg one time, and send it repeatedly like this:
			packer p;
			auto msg = p.pack_msg(str.data(), str.size() + 1);
315
			//send \0 character too, because demo client used basic_buffer as its msg type, it will not append \0 character automatically as std::string does,
Y
youngwolf 已提交
316 317
			//so need \0 character when printing it.
			if (!msg.empty())
Y
youngwolf 已提交
318
				((normal_server&) normal_server_).do_something_to_all([&msg](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(msg);});
Y
youngwolf 已提交
319 320
			*/
			/*
321
			//if demo client is using stream_unpacker
Y
youngwolf 已提交
322 323 324
			((normal_server&) normal_server_).do_something_to_all([&str](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(str);});
			//or
			normal_server_.broadcast_native_msg(str);
Y
youngwolf 已提交
325
			*/
Y
youngwolf 已提交
326 327 328 329 330
		}
	}

	return 0;
}