提交 576ad833 编写于 作者: S Shiyuan Shang-Guan

update


Former-commit-id: f3344a44fc17076fdbf1e1795d50289bfcd22623
上级 d71bcc88
......@@ -71,16 +71,19 @@ void EpollCommNet::SendActorMsg(int64_t dst_machine_id, const ActorMsg& actor_ms
SocketMsg msg;
msg.msg_type = SocketMsgType::kActor;
msg.actor_msg = actor_msg;
GetSocketHelper(dst_machine_id, 0)->AsyncWrite(msg);
GetSocketHelper(dst_machine_id, epoll_conf_.link_num() - 1)->AsyncWrite(msg);
}
void EpollCommNet::RequestRead(int64_t dst_machine_id, void* src_token, void* dst_token,
void* read_id) const {
int32_t total_byte_size = static_cast<const SocketMemDesc*>(src_token)->byte_size;
CHECK_GT(total_byte_size, 0);
int32_t part_length = (total_byte_size + epoll_conf_.link_num() - 1) / epoll_conf_.link_num();
part_length = RoundUp(part_length, epoll_conf_.msg_segment_kbyte() * 1024);
int32_t part_num = (total_byte_size + part_length - 1) / part_length;
CHECK_LE(part_num, epoll_conf_.link_num());
for (int32_t link_i = 0; link_i < part_num; ++link_i) {
int32_t byte_size = (total_byte_size > part_length) ? (part_length) : (total_byte_size);
CHECK_GT(byte_size, 0);
......@@ -117,7 +120,7 @@ void EpollCommNet::InitSockets() {
int64_t this_machine_id = Global<MachineCtx>::Get()->this_machine_id();
auto this_machine = Global<JobDesc>::Get()->resource().machine(this_machine_id);
int64_t total_machine_num = Global<JobDesc>::Get()->TotalMachineNum();
machine_id2sockfds_.assign(total_machine_num * epoll_conf_.link_num(), -1);
machine_link_id2sockfds_.assign(total_machine_num * epoll_conf_.link_num(), -1);
sockfd2helper_.clear();
size_t poller_idx = 0;
auto NewSocketHelper = [&](int32_t sockfd) {
......@@ -159,7 +162,7 @@ void EpollCommNet::InitSockets() {
PCHECK(connect(sockfd, reinterpret_cast<sockaddr*>(&peer_sockaddr), sizeof(peer_sockaddr))
== 0);
CHECK(sockfd2helper_.emplace(sockfd, NewSocketHelper(sockfd)).second);
machine_id2sockfds_.at(peer_mchn_id * epoll_conf_.link_num() + link_i) = sockfd;
machine_link_id2sockfds_.at(peer_mchn_id * epoll_conf_.link_num() + link_i) = sockfd;
}
}
......@@ -172,23 +175,23 @@ void EpollCommNet::InitSockets() {
PCHECK(sockfd != -1);
CHECK(sockfd2helper_.emplace(sockfd, NewSocketHelper(sockfd)).second);
int64_t peer_mchn_id = GetMachineId(peer_sockaddr);
machine_id2sockfds_.at(peer_mchn_id * epoll_conf_.link_num() + link_i) = sockfd;
machine_link_id2sockfds_.at(peer_mchn_id * epoll_conf_.link_num() + link_i) = sockfd;
}
}
PCHECK(close(listen_sockfd) == 0);
ClearPort(this_machine_id);
// useful log
FOR_RANGE(int64_t, machine_id, 0, total_machine_num) {
for (int64_t peer_mchn_id : peer_machine_id()) {
FOR_RANGE(int32_t, link_i, 0, epoll_conf_.link_num()) {
LOG(INFO) << "machine: " << machine_id << ", link index: " << link_i << ", sockfd: "
<< machine_id2sockfds_.at(machine_id * epoll_conf_.link_num() + link_i);
LOG(INFO) << "machine: " << peer_mchn_id << ", link index: " << link_i << ", sockfd: "
<< machine_link_id2sockfds_.at(peer_mchn_id * epoll_conf_.link_num() + link_i);
}
}
}
SocketHelper* EpollCommNet::GetSocketHelper(int64_t machine_id, int32_t link_index) const {
int32_t sockfd = machine_id2sockfds_.at(machine_id * epoll_conf_.link_num() + link_index);
int32_t sockfd = machine_link_id2sockfds_.at(machine_id * epoll_conf_.link_num() + link_index);
return sockfd2helper_.at(sockfd);
}
......@@ -200,7 +203,7 @@ void EpollCommNet::DoRead(void* read_id, int64_t src_machine_id, void* src_token
msg.request_write_msg.dst_token = dst_token;
msg.request_write_msg.read_id = read_id;
dst_token2part_done_cnt_.at(dst_token) = 0;
GetSocketHelper(src_machine_id, 0)->AsyncWrite(msg);
GetSocketHelper(src_machine_id, epoll_conf_.link_num() - 1)->AsyncWrite(msg);
}
void EpollCommNet::PartReadDone(void* read_id, void* dst_token, int32_t part_num) {
......
......@@ -32,7 +32,8 @@ class EpollCommNet final : public CommNetIf<SocketMemDesc> {
const EpollConf& epoll_conf_;
std::vector<IOEventPoller*> pollers_;
std::vector<int32_t> machine_id2sockfds_;
// machine_link_id = machine_id * epoll_conf_.link_num() + link_id
std::vector<int64_t> machine_link_id2sockfds_;
HashMap<int, SocketHelper*> sockfd2helper_;
HashMap<void*, std::atomic<int32_t>> dst_token2part_done_cnt_;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册