pingpong_client.cpp 6.6 KB
Newer Older
Y
youngwolf 已提交
1 2 3 4

#include <iostream>

//configuration
Y
youngwolf 已提交
5
#define ASCS_SERVER_PORT	9527
Y
youngwolf 已提交
6
#define ASCS_REUSE_OBJECT //use objects pool
Y
youngwolf 已提交
7
#define ASCS_DELAY_CLOSE	5 //define this to avoid hooks for async call (and slightly improve efficiency)
Y
youngwolf 已提交
8
#define ASCS_SYNC_DISPATCH
Y
youngwolf 已提交
9 10 11
//#define ASCS_WANT_MSG_SEND_NOTIFY
#define ASCS_MSG_BUFFER_SIZE 65536
#define ASCS_DEFAULT_UNPACKER stream_unpacker //non-protocol
Y
youngwolf 已提交
12
#define ASCS_DECREASE_THREAD_AT_RUNTIME
Y
youngwolf 已提交
13 14
//configuration

W
wolf 已提交
15
#include <ascs/ext/tcp.h>
Y
youngwolf 已提交
16
using namespace ascs;
W
wolf 已提交
17
using namespace ascs::tcp;
Y
youngwolf 已提交
18
using namespace ascs::ext;
W
wolf 已提交
19
using namespace ascs::ext::tcp;
Y
youngwolf 已提交
20 21 22 23 24 25

#ifdef _MSC_VER
#define atoll _atoi64
#endif

#define QUIT_COMMAND	"quit"
26
#define STATUS			"status"
Y
youngowlf 已提交
27 28 29 30
#define STATISTIC		"statistic"
#define LIST_ALL_CLIENT	"list all client"
#define INCREASE_THREAD	"increase thread"
#define DECREASE_THREAD	"decrease thread"
Y
youngwolf 已提交
31

32
cpu_timer begin_time;
Y
youngwolf 已提交
33 34
std::atomic_ushort completed_session_num;

Y
youngowlf 已提交
35
class echo_socket : public client_socket
Y
youngwolf 已提交
36 37
{
public:
Y
youngwolf 已提交
38
	echo_socket(i_matrix& matrix_) : client_socket(matrix_) {}
Y
youngwolf 已提交
39 40 41 42 43 44 45

	void begin(size_t msg_num, const char* msg, size_t msg_len)
	{
		total_bytes = msg_len;
		total_bytes *= msg_num;
		send_bytes = recv_bytes = 0;

46
		send_native_msg(msg, msg_len, false);
Y
youngwolf 已提交
47 48 49
	}

protected:
Y
youngowlf 已提交
50
	virtual void on_connect() {asio::ip::tcp::no_delay option(true); lowest_layer().set_option(option); client_socket::on_connect();}
Y
youngwolf 已提交
51

52
	//msg handling, must define macro ASCS_SYNC_DISPATCH
Y
youngwolf 已提交
53
	//do not hold msg_can for further usage, access msg_can and return from on_msg as quickly as possible
Y
yang li 已提交
54
	//access msg_can freely within this callback, it's always thread safe.
Y
youngwolf 已提交
55
	virtual size_t on_msg(std::list<out_msg_type>& msg_can)
Y
youngwolf 已提交
56 57
	{
		ascs::do_something_to_all(msg_can, [this](out_msg_type& msg) {this->handle_msg(msg);});
Y
YangLi 已提交
58 59
		msg_can.clear(); //if we left behind some messages in msg_can, they will be dispatched via on_msg_handle asynchronously, which means it's
		//possible that on_msg_handle be invoked concurrently with the next on_msg (new messages arrived) and then disorder messages.
Y
YangLi 已提交
60 61
		//here we always consumed all messages, so we can use sync message dispatching, otherwise, we should not use sync message dispatching
		//except we can bear message disordering.
Y
youngwolf 已提交
62

63
		return 1;
Y
youngwolf 已提交
64 65
	}
	//msg handling end
Y
youngwolf 已提交
66 67 68 69 70 71

#ifdef ASCS_WANT_MSG_SEND_NOTIFY
	virtual void on_msg_send(in_msg_type& msg)
	{
		send_bytes += msg.size();
		if (send_bytes < total_bytes)
Y
youngwolf 已提交
72
			direct_send_msg(std::move(msg), true);
Y
youngwolf 已提交
73 74 75 76 77 78 79 80 81 82
	}

private:
	void handle_msg(out_msg_ctype& msg)
	{
		recv_bytes += msg.size();
		if (recv_bytes >= total_bytes && 0 == --completed_session_num)
			begin_time.stop();
	}
#else
83
private:
Y
youngwolf 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96
	void handle_msg(out_msg_type& msg)
	{
		if (0 == total_bytes)
			return;

		recv_bytes += msg.size();
		if (recv_bytes >= total_bytes)
		{
			total_bytes = 0;
			if (0 == --completed_session_num)
				begin_time.stop();
		}
		else
Y
youngwolf 已提交
97
			direct_send_msg(std::move(msg), true);
Y
yang li 已提交
98
		//if the type of out_msg_type and in_msg_type are not identical, the compilation will fail, then you should use send_native_msg instead.
Y
youngwolf 已提交
99 100 101 102 103 104 105
	}
#endif

private:
	uint64_t total_bytes, send_bytes, recv_bytes;
};

Y
youngwolf 已提交
106
class echo_client : public multi_client_base<echo_socket>
Y
youngwolf 已提交
107 108
{
public:
Y
youngwolf 已提交
109
	echo_client(service_pump& service_pump_) : multi_client_base<echo_socket>(service_pump_) {}
Y
youngwolf 已提交
110

Y
youngwolf 已提交
111
	void begin(size_t msg_num, const char* msg, size_t msg_len) {do_something_to_all([&](object_ctype& item) {item->begin(msg_num, msg, msg_len);});}
Y
youngwolf 已提交
112 113 114 115
};

int main(int argc, const char* argv[])
{
116
	printf("usage: %s [<service thread number=1> [<port=%d> [<ip=%s> [link num=16]]]]\n", argv[0], ASCS_SERVER_PORT, ASCS_SERVER_IP);
Y
youngwolf 已提交
117 118 119 120 121 122 123 124 125 126
	if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h")))
		return 0;
	else
		puts("type " QUIT_COMMAND " to end.");

	///////////////////////////////////////////////////////////
	size_t link_num = 16;
	if (argc > 4)
		link_num = std::min(ASCS_MAX_OBJECT_NUM, std::max(atoi(argv[4]), 1));

Y
YangLi 已提交
127
	printf("exec: pingpong_client with " ASCS_SF " links\n", link_num);
Y
youngwolf 已提交
128 129 130 131 132
	///////////////////////////////////////////////////////////

	service_pump sp;
	echo_client client(sp);

Y
youngwolf 已提交
133 134
//	argv[3] = "::1" //ipv6
//	argv[3] = "127.0.0.1" //ipv4
Y
youngwolf 已提交
135 136 137 138 139 140
	std::string ip = argc > 3 ? argv[3] : ASCS_SERVER_IP;
	unsigned short port = argc > 2 ? atoi(argv[2]) : ASCS_SERVER_PORT;

	auto thread_num = 1;
	if (argc > 1)
		thread_num = std::min(16, std::max(thread_num, atoi(argv[1])));
141 142 143
	//add one thread will seriously impact IO throughput when doing performance benchmark, this is because the business logic is very simple (send original messages back,
	//or just add up total message size), under this scenario, just one service thread without receiving buffer will obtain the best IO throughput.
	//the server has such behavior too.
Y
youngwolf 已提交
144 145

	for (size_t i = 0; i < link_num; ++i)
Y
youngwolf 已提交
146
		client.add_socket(port, ip);
Y
youngwolf 已提交
147 148 149 150 151 152

	sp.start_service(thread_num);
	while(sp.is_running())
	{
		std::string str;
		std::getline(std::cin, str);
Y
YangLi 已提交
153 154 155
		if (str.empty())
			;
		else if (QUIT_COMMAND == str)
Y
youngwolf 已提交
156
			sp.stop_service();
157
		else if (STATISTIC == str)
Y
youngwolf 已提交
158
		{
159
			printf("link #: " ASCS_SF ", valid links: " ASCS_SF ", invalid links: " ASCS_SF "\n\n", client.size(), client.valid_size(), client.invalid_object_size());
Y
youngwolf 已提交
160 161
			puts(client.get_statistic().to_string().data());
		}
162 163 164 165
		else if (STATUS == str)
			client.list_all_status();
		else if (LIST_ALL_CLIENT == str)
			client.list_all_object();
Y
youngwolf 已提交
166 167 168 169
		else if (INCREASE_THREAD == str)
			sp.add_service_thread(1);
		else if (DECREASE_THREAD == str)
			sp.del_service_thread(1);
Y
YangLi 已提交
170
		else
Y
youngwolf 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
		{
			size_t msg_num = 1024;
			size_t msg_len = 1024; //must greater than or equal to sizeof(size_t)
			auto msg_fill = '0';

			auto parameters = split_string(str);
			auto iter = std::begin(parameters);
			if (iter != std::end(parameters)) msg_num = std::max((size_t) atoll(iter++->data()), (size_t) 1);
			if (iter != std::end(parameters)) msg_len = std::min((size_t) ASCS_MSG_BUFFER_SIZE, std::max((size_t) atoi(iter++->data()), (size_t) 1));
			if (iter != std::end(parameters)) msg_fill = *iter++->data();

			printf("test parameters after adjustment: " ASCS_SF " " ASCS_SF " %c\n", msg_num, msg_len, msg_fill);
			puts("performance test begin, this application will have no response during the test!");

			completed_session_num = (unsigned short) link_num;
			auto init_msg = new char[msg_len];
			memset(init_msg, msg_fill, msg_len);
			client.begin(msg_num, init_msg, msg_len);
			begin_time.restart();

			while (0 != completed_session_num)
				std::this_thread::sleep_for(std::chrono::milliseconds(50));

			uint64_t total_msg_bytes = link_num; total_msg_bytes *= msg_len; total_msg_bytes *= msg_num;
Y
youngwolf 已提交
195 196
			printf("finished in %f seconds, TPS: %f(*2), speed: %f(*2) MBps.\n",
				begin_time.elapsed(), link_num * msg_num / begin_time.elapsed(), total_msg_bytes / begin_time.elapsed() / 1024 / 1024);
Y
youngwolf 已提交
197 198 199 200 201 202 203

			delete[] init_msg;
		}
	}

    return 0;
}