diff --git a/oneflow/core/comm_network/iocp/io_worker.cpp b/oneflow/core/comm_network/iocp/io_worker.cpp index 83eea80bff80c468fca7e9d9a425728ffcd06e3d..02b58ebb7f8e8cb2a3ca44780a4c1e13afdd0168 100644 --- a/oneflow/core/comm_network/iocp/io_worker.cpp +++ b/oneflow/core/comm_network/iocp/io_worker.cpp @@ -108,13 +108,15 @@ void IOWorker::Start() { } void IOWorker::Stop() { - IOData* stop_io_data = new IOData; - stop_io_data->IO_type = IOType::kStop; - memset(&(stop_io_data->overlapped), 0, sizeof(OVERLAPPED)); - ResetIODataBuff(stop_io_data); - PostQueuedCompletionStatus(completion_port_, 0, - machine_id2socket_[this_machine_id_], - reinterpret_cast(stop_io_data)); + for (size_t i = 0; i < num_of_concurrent_threads_; ++i) { + IOData* stop_io_data = new IOData; + stop_io_data->IO_type = IOType::kStop; + memset(&(stop_io_data->overlapped), 0, sizeof(OVERLAPPED)); + ResetIODataBuff(stop_io_data); + PostQueuedCompletionStatus(completion_port_, 0, i, + reinterpret_cast(stop_io_data)); + LOG(INFO) << "Post stop request " << i << " to IOCP\n"; + } } void IOWorker::InitSockets() { @@ -124,7 +126,6 @@ void IOWorker::InitSockets() { SOCKET listen_socket = socket(AF_INET, SOCK_STREAM, 0); PCHECK(listen_socket != INVALID_SOCKET) << "socket failed with error:" << WSAGetLastError() << "\n"; - machine_id2socket_[this_machine_id_] = listen_socket; uint16_t this_listen_port = 1024; uint16_t listen_port_max = std::numeric_limits::max(); for (; this_listen_port < listen_port_max; ++this_listen_port) { @@ -204,6 +205,7 @@ DWORD IOWorker::ThreadProc() { switch (io_data_ptr->IO_type) { case IOType::kStop: { delete io_data_ptr; + LOG(INFO) << "stop IOworker " << completion_key << " \n"; return 0; } case IOType::kRecvMsgHead: {