echo_server.cpp 11.5 KB
Newer Older
Y
youngwolf 已提交
1 2 3 4 5 6

#include <iostream>

//configuration
#define ASCS_SERVER_PORT		9527
#define ASCS_REUSE_OBJECT //use objects pool
Y
youngwolf 已提交
7
//#define ASCS_FREE_OBJECT_INTERVAL	60 //it's useless if ASCS_REUSE_OBJECT macro been defined
Y
YangLi 已提交
8
//#define ASCS_SYNC_DISPATCH //do not open this feature, see below for more details
Y
youngwolf 已提交
9
#define ASCS_DISPATCH_BATCH_MSG
Y
youngwolf 已提交
10
#define ASCS_ENHANCED_STABILITY
11
//#define ASCS_FULL_STATISTIC //full statistic will slightly impact efficiency
Y
YangLi 已提交
12
#define ASCS_USE_STEADY_TIMER
Y
youngwolf 已提交
13
#define ASCS_ALIGNED_TIMER
Y
youngwolf 已提交
14 15
#define ASCS_AVOID_AUTO_STOP_SERVICE
#define ASCS_DECREASE_THREAD_AT_RUNTIME
Y
youngowlf 已提交
16 17 18 19
//#define ASCS_MAX_MSG_NUM		16
//if there's a huge number of links, please reduce messge buffer via ASCS_MAX_MSG_NUM macro.
//please think about if we have 512 links, how much memory we can accupy at most with default ASCS_MAX_MSG_NUM?
//it's 2 * 1024 * 1024 * 512 = 1G
Y
youngwolf 已提交
20 21 22 23

//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE	0
//0-default packer and unpacker, head(length) + body
W
wolf 已提交
24 25 26
//1-replaceable packer and unpacker, head(length) + body
//2-fixed length packer and unpacker
//3-prefix and/or suffix packer and unpacker
Y
youngwolf 已提交
27 28

#if 1 == PACKER_UNPACKER_TYPE
Y
youngwolf 已提交
29 30 31
#if defined(_MSC_VER) && _MSC_VER <= 1800
#define ASCS_DEFAULT_PACKER replaceable_packer<shared_buffer<i_buffer>>
#else
Y
youngwolf 已提交
32
#define ASCS_DEFAULT_PACKER replaceable_packer<>
Y
youngwolf 已提交
33
#endif
Y
youngwolf 已提交
34
#define ASCS_DEFAULT_UNPACKER replaceable_unpacker<>
Y
youngwolf 已提交
35
#elif 2 == PACKER_UNPACKER_TYPE
Y
youngwolf 已提交
36 37
#undef ASCS_HEARTBEAT_INTERVAL
#define ASCS_HEARTBEAT_INTERVAL	0 //not support heartbeat
38
#define ASCS_DEFAULT_PACKER fixed_length_packer
Y
youngwolf 已提交
39 40
#define ASCS_DEFAULT_UNPACKER fixed_length_unpacker
#elif 3 == PACKER_UNPACKER_TYPE
Y
youngwolf 已提交
41 42
#undef ASCS_HEARTBEAT_INTERVAL
#define ASCS_HEARTBEAT_INTERVAL	0 //not support heartbeat
Y
youngwolf 已提交
43 44 45 46 47
#define ASCS_DEFAULT_PACKER prefix_suffix_packer
#define ASCS_DEFAULT_UNPACKER prefix_suffix_unpacker
#endif
//configuration

W
wolf 已提交
48
#include <ascs/ext/tcp.h>
Y
youngwolf 已提交
49
using namespace ascs;
W
wolf 已提交
50
using namespace ascs::tcp;
Y
youngwolf 已提交
51
using namespace ascs::ext;
W
wolf 已提交
52
using namespace ascs::ext::tcp;
Y
youngwolf 已提交
53 54 55

#define QUIT_COMMAND	"quit"
#define RESTART_COMMAND	"restart"
56
#define STATUS			"status"
Y
youngowlf 已提交
57 58 59 60
#define STATISTIC		"statistic"
#define LIST_ALL_CLIENT	"list all client"
#define INCREASE_THREAD	"increase thread"
#define DECREASE_THREAD	"decrease thread"
Y
youngwolf 已提交
61 62

//demonstrate how to use custom packer
Y
youngwolf 已提交
63
//under the default behavior, each tcp::socket has their own packer, and cause memory waste
Y
youngwolf 已提交
64 65 66 67
//at here, we make each echo_socket use the same global packer for memory saving
//notice: do not do this for unpacker, because unpacker has member variables and can't share each other
auto global_packer(std::make_shared<ASCS_DEFAULT_PACKER>());

Y
youngwolf 已提交
68
//demonstrate how to control the type of tcp::server_socket_base::server from template parameter
Y
youngwolf 已提交
69 70 71 72 73 74
class i_echo_server : public i_server
{
public:
	virtual void test() = 0;
};

W
wolf 已提交
75
class echo_socket : public server_socket_base<ASCS_DEFAULT_PACKER, ASCS_DEFAULT_UNPACKER, i_echo_server>
Y
youngwolf 已提交
76 77 78 79
{
public:
	echo_socket(i_echo_server& server_) : server_socket_base(server_)
	{
Y
youngwolf 已提交
80
		packer(global_packer);
Y
youngwolf 已提交
81 82

#if 2 == PACKER_UNPACKER_TYPE
Y
youngwolf 已提交
83
		std::dynamic_pointer_cast<ASCS_DEFAULT_UNPACKER>(unpacker())->fixed_length(1024);
Y
youngwolf 已提交
84
#elif 3 == PACKER_UNPACKER_TYPE
Y
youngwolf 已提交
85
		std::dynamic_pointer_cast<ASCS_DEFAULT_UNPACKER>(unpacker())->prefix_suffix("begin", "end");
Y
youngwolf 已提交
86 87 88 89 90 91 92 93 94 95 96 97
#endif
	}

public:
	//because we use objects pool(REUSE_OBJECT been defined), so, strictly speaking, this virtual
	//function must be rewrote, but we don't have member variables to initialize but invoke father's
	//reset() directly, so, it can be omitted, but we keep it for possibly future using
	virtual void reset() {server_socket_base::reset();}

protected:
	virtual void on_recv_error(const asio::error_code& ec)
	{
Y
youngwolf 已提交
98
		//the type of tcp::server_socket_base::server now can be controlled by derived class(echo_socket),
Y
youngwolf 已提交
99
		//which is actually i_echo_server, so, we can invoke i_echo_server::test virtual function.
Y
youngwolf 已提交
100
		get_server().test();
Y
youngwolf 已提交
101 102 103 104
		server_socket_base::on_recv_error(ec);
	}

	//msg handling: send the original msg back(echo server)
Y
YangLi 已提交
105 106
/*
#ifdef ASCS_SYNC_DISPATCH //do not open this feature
Y
YangLi 已提交
107 108
	//do not hold msg_can for further using, return from on_msg as quickly as possible
	virtual size_t on_msg(std::list<out_msg_type>& msg_can)
Y
YangLi 已提交
109 110
	{
		if (!is_send_buffer_available())
Y
YangLi 已提交
111 112
			return 0; //congestion control
		//here if we cannot handle all messages in msg_can, do not use sync message dispatching except we can bear message disordering,
Y
YangLi 已提交
113 114
		//this is because on_msg_handle can be invoked concurrently with the next on_msg (new messages arrived) and then disorder messages.
		//and do not try to handle all messages here (just for echo_server's business logic) because:
Y
YangLi 已提交
115 116
		//1. we can not use safe_send_msg as i said many times, we should not block service threads.
		//2. if we use true can_overflow to call send_msg, then buffer usage will be out of control, we should not take this risk.
Y
YangLi 已提交
117 118 119

		ascs::do_something_to_all(msg_can, [this](out_msg_type& msg) {this->send_msg(msg, true);});
		auto re = msg_can.size();
Y
YangLi 已提交
120
		msg_can.clear();
Y
YangLi 已提交
121 122 123 124

		return re;
	}
#endif
Y
YangLi 已提交
125
*/
Y
youngwolf 已提交
126
#ifdef ASCS_DISPATCH_BATCH_MSG
Y
YangLi 已提交
127 128
	//do not hold msg_can for further using, access msg_can and return from on_msg_handle as quickly as possible
	virtual size_t on_msg_handle(out_queue_type& msg_can)
Y
youngwolf 已提交
129 130 131 132 133
	{
		if (!is_send_buffer_available())
			return 0;

		out_container_type tmp_can;
Y
youngwolf 已提交
134 135 136
		//this manner requires the container used by the message queue can be spliced (such as std::list, but not std::vector,
		// ascs doesn't require this characteristic).
		//these code can be compiled because we used list as the container of the message queue, see macro ASCS_OUTPUT_CONTAINER for more details
Y
youngwolf 已提交
137 138 139
		//to consume all messages in msg_can, see echo_client
		msg_can.lock();
		auto begin_iter = std::begin(msg_can);
Y
youngwolf 已提交
140
		//don't be too greedy, here is in a service thread, we should not block this thread for a long time
Y
youngwolf 已提交
141
		auto end_iter = msg_can.size() > 10 ? std::next(begin_iter, 10) : std::end(msg_can);
Y
YangLi 已提交
142
		tmp_can.splice(std::end(tmp_can), msg_can, begin_iter, end_iter); //the rest messages will be dispatched via the next on_msg_handle
Y
youngwolf 已提交
143
		msg_can.unlock();
Y
youngwolf 已提交
144

145
		ascs::do_something_to_all(tmp_can, [this](out_msg_type& msg) {this->send_msg(msg, true);});
Y
youngwolf 已提交
146 147 148
		return tmp_can.size();
	}
#else
Y
youngwolf 已提交
149
	virtual bool on_msg_handle(out_msg_type& msg) {return send_msg(msg);}
Y
youngwolf 已提交
150
#endif
Y
youngwolf 已提交
151 152 153
	//msg handling end
};

W
wolf 已提交
154
class echo_server : public server_base<echo_socket, object_pool<echo_socket>, i_echo_server>
Y
youngwolf 已提交
155 156 157 158 159 160 161 162
{
public:
	echo_server(service_pump& service_pump_) : server_base(service_pump_) {}

	//from i_echo_server, pure virtual function, we must implement it.
	virtual void test() {/*puts("in echo_server::test()");*/}
};

Y
youngwolf 已提交
163 164 165 166 167 168 169 170 171 172
#if ASCS_HEARTBEAT_INTERVAL > 0
typedef server_socket_base<packer, unpacker> normal_socket;
#else
//demonstrate how to open heartbeat function without defining macro ASCS_HEARTBEAT_INTERVAL
class normal_socket : public server_socket_base<packer, unpacker>
{
public:
	normal_socket(i_server& server_) : server_socket_base(server_) {}

protected:
Y
yang li 已提交
173
	//demo client needs heartbeat (macro ASCS_HEARTBEAT_INTERVAL been defined), please note that the interval (here is 5) must be equal to
Y
youngwolf 已提交
174 175
	//macro ASCS_HEARTBEAT_INTERVAL defined in demo client, and macro ASCS_HEARTBEAT_MAX_ABSENCE must has the same value as demo client's.
	virtual void on_connect() {start_heartbeat(5);}
Y
youngwolf 已提交
176 177 178
};
#endif

179 180 181 182 183 184 185 186 187
class short_connection : public server_socket_base<packer, unpacker>
{
public:
	short_connection(i_server& server_) : server_socket_base(server_) {}

protected:
	//msg handling
#ifdef ASCS_SYNC_DISPATCH
	//do not hold msg_can for further using, return from on_msg as quickly as possible
Y
yang li 已提交
188
	virtual size_t on_msg(std::list<out_msg_type>& msg_can) {auto re = server_socket_base::on_msg(msg_can); force_shutdown(); return re;}
189 190 191 192
#endif

#ifdef ASCS_DISPATCH_BATCH_MSG
	//do not hold msg_can for further using, access msg_can and return from on_msg_handle as quickly as possible
Y
yang li 已提交
193
	virtual size_t on_msg_handle(out_queue_type& msg_can) {auto re = server_socket_base::on_msg_handle(msg_can); force_shutdown(); return re;}
194 195 196 197 198 199
#else
	virtual bool on_msg_handle(out_msg_type& msg) {auto re = server_socket_base::on_msg_handle(msg); force_shutdown(); return re;}
#endif
	//msg handling end
};

Y
youngwolf 已提交
200 201
int main(int argc, const char* argv[])
{
202
	printf("usage: %s [<service thread number=1> [<port=%d> [ip=0.0.0.0]]]\n", argv[0], ASCS_SERVER_PORT);
Y
youngwolf 已提交
203 204 205 206 207 208 209
	puts("normal server's port will be 100 larger.");
	if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h")))
		return 0;
	else
		puts("type " QUIT_COMMAND " to end.");

	service_pump sp;
Y
youngwolf 已提交
210
	//only need a simple server? you can directly use server or tcp::server_base, because of normal_socket,
Y
youngwolf 已提交
211 212 213
	//this server cannot support fixed_length_packer/fixed_length_unpacker and prefix_suffix_packer/prefix_suffix_unpacker,
	//the reason is these packer and unpacker need additional initializations that normal_socket not implemented,
	//see echo_socket's constructor for more details.
214 215
	server_base<normal_socket> normal_server(sp);
	server_base<short_connection> short_server(sp);
Y
youngwolf 已提交
216 217
	echo_server echo_server_(sp); //echo server

Y
yang li 已提交
218 219 220 221
	unsigned short port = ASCS_SERVER_PORT;
	std::string ip;
	if (argc > 2)
		port = (unsigned short) atoi(argv[2]);
Y
youngwolf 已提交
222
	if (argc > 3)
Y
yang li 已提交
223 224 225 226 227
		ip = argv[3];

	normal_server.set_server_addr(port + 100, ip);
	short_server.set_server_addr(port + 101, ip);
	echo_server_.set_server_addr(port, ip);
Y
youngwolf 已提交
228 229 230 231 232 233

	auto thread_num = 1;
	if (argc > 1)
		thread_num = std::min(16, std::max(thread_num, atoi(argv[1])));

#if 3 == PACKER_UNPACKER_TYPE
Y
youngwolf 已提交
234
	global_packer->prefix_suffix("begin", "end");
Y
youngwolf 已提交
235 236 237 238 239 240
#endif

	sp.start_service(thread_num);
	while(sp.is_running())
	{
		std::string str;
Y
youngowlf 已提交
241
		std::getline(std::cin, str);
Y
YangLi 已提交
242 243 244
		if (str.empty())
			;
		else if (QUIT_COMMAND == str)
Y
youngwolf 已提交
245 246 247 248 249 250
			sp.stop_service();
		else if (RESTART_COMMAND == str)
		{
			sp.stop_service();
			sp.start_service(thread_num);
		}
251
		else if (STATISTIC == str)
Y
youngwolf 已提交
252
		{
253
			printf("normal server, link #: " ASCS_SF ", invalid links: " ASCS_SF "\n", normal_server.size(), normal_server.invalid_object_size());
254
			printf("echo server, link #: " ASCS_SF ", invalid links: " ASCS_SF "\n\n", echo_server_.size(), echo_server_.invalid_object_size());
Y
youngwolf 已提交
255 256
			puts(echo_server_.get_statistic().to_string().data());
		}
257 258
		else if (STATUS == str)
		{
259
			normal_server.list_all_status();
260 261
			echo_server_.list_all_status();
		}
Y
youngwolf 已提交
262 263 264
		else if (LIST_ALL_CLIENT == str)
		{
			puts("clients from normal server:");
265
			normal_server.list_all_object();
Y
youngwolf 已提交
266 267 268
			puts("clients from echo server:");
			echo_server_.list_all_object();
		}
Y
youngwolf 已提交
269 270 271 272
		else if (INCREASE_THREAD == str)
			sp.add_service_thread(1);
		else if (DECREASE_THREAD == str)
			sp.del_service_thread(1);
Y
youngwolf 已提交
273 274
		else
		{
Y
youngwolf 已提交
275 276
//			/*
			//broadcast series functions call pack_msg for each client respectively, because clients may used different protocols(so different type of packers, of course)
277
			normal_server.broadcast_msg(str.data(), str.size() + 1, false);
Y
youngwolf 已提交
278 279
			//send \0 character too, because demo client used basic_buffer as its msg type, it will not append \0 character automatically as std::string does,
			//so need \0 character when printing it.
Y
youngwolf 已提交
280
//			*/
Y
youngwolf 已提交
281
			/*
Y
youngwolf 已提交
282 283 284
			//if all clients used the same protocol, we can pack msg one time, and send it repeatedly like this:
			packer p;
			auto msg = p.pack_msg(str.data(), str.size() + 1);
285
			//send \0 character too, because demo client used basic_buffer as its msg type, it will not append \0 character automatically as std::string does,
Y
youngwolf 已提交
286 287
			//so need \0 character when printing it.
			if (!msg.empty())
288
				normal_server.do_something_to_all([&msg](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(msg);});
Y
youngwolf 已提交
289 290
			*/
			/*
291
			//if demo client is using stream_unpacker
292
			normal_server.do_something_to_all([&str](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(str);});
Y
youngwolf 已提交
293
			*/
Y
youngwolf 已提交
294 295 296 297 298
		}
	}

	return 0;
}