未验证 提交 a7ab7ec1 编写于 作者: S Shenghang Tsai 提交者: GitHub

Bakcport bug fixes for distributed run from multi node ci (#3765)

* cherry picking changes from multi node ci

* call original exit in monkey patch

* add bazel_cache for XLA build

* Revert "add bazel_cache for XLA build"

This reverts commit 5903717d491fc5be8b1fd19cddadd826863c0d76.

* global

* dont del
Co-authored-by: Noneflow-bot <69100618+oneflow-bot@users.noreply.github.com>
上级 a722940a
......@@ -15,9 +15,11 @@ limitations under the License.
*/
#include <atomic>
#include <pybind11/pybind11.h>
#include "oneflow/core/job/env_global_objects_scope.h"
#include "oneflow/core/job/job_build_and_infer_ctx_mgr.h"
#include "oneflow/cfg/pybind_module_registry.h"
#include "oneflow/api/python/of_api_registry.h"
#include "oneflow/core/job/cluster_instruction.h"
namespace py = pybind11;
......@@ -31,6 +33,11 @@ uint64_t NewTokenId() {
PYBIND11_MODULE(oneflow_api, m) {
m.def("EagerExecutionEnabled", []() { return EagerExecutionEnabled(); });
m.def("MasterSendAbort", []() {
if (Global<EnvGlobalObjectsScope>::Get() != nullptr) {
return ClusterInstruction::MasterSendAbort();
}
});
m.def("NewTokenId", &NewTokenId);
::oneflow::cfg::Pybind11ModuleRegistry().ImportAll(m);
::oneflow::OneflowModuleRegistry().ImportAll(m);
......
......@@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/comm_network/epoll/epoll_comm_network.h"
#include "glog/logging.h"
#include "oneflow/core/control/ctrl_client.h"
#include "oneflow/core/job/machine_context.h"
#include "oneflow/core/job/resource_desc.h"
......@@ -32,19 +33,24 @@ sockaddr_in GetSockAddr(const std::string& addr, uint16_t port) {
sockaddr_in sa;
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
PCHECK(inet_pton(AF_INET, addr.c_str(), &(sa.sin_addr)) == 1);
PCHECK(inet_pton(AF_INET, addr.c_str(), &(sa.sin_addr)) == 1)
<< "addr: " << addr << ", port: " << port;
return sa;
}
int SockListen(int listen_sockfd, uint16_t listen_port, int32_t total_machine_num) {
sockaddr_in sa = GetSockAddr("0.0.0.0", listen_port);
int reuse = 1;
int ret_setopt =
setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&reuse, sizeof(int));
CHECK_EQ(ret_setopt, 0);
int bind_result = bind(listen_sockfd, reinterpret_cast<sockaddr*>(&sa), sizeof(sa));
if (bind_result == 0) {
PCHECK(listen(listen_sockfd, total_machine_num) == 0);
LOG(INFO) << "CommNet:Epoll listening on "
<< "0.0.0.0:" + std::to_string(listen_port);
} else {
PCHECK(errno == EACCES || errno == EADDRINUSE);
PCHECK(errno == EACCES || errno == EADDRINUSE) << "SockListen errno: " << errno;
}
return bind_result;
}
......
......@@ -68,6 +68,8 @@ Maybe<void> Cluster::WorkerLoop() {
ClusterInstruction::WorkerReceiveInstruction(mut_cluster_instruction.get());
if (mut_cluster_instruction->has_cluster_ctrl_halt()) {
break;
} else if (mut_cluster_instruction->has_cluster_ctrl_abort()) {
LOG(FATAL) << "received abort instruction";
} else if (mut_cluster_instruction->has_cluster_ctrl_session_start()) {
ClusterInstruction::NewSessionBarrier();
AsyncRunLazyJobSet(&lazy_runtime_thread);
......
......@@ -116,6 +116,13 @@ void ClusterInstruction::MasterSendHalt() {
HaltBarrier();
}
void ClusterInstruction::MasterSendAbort() {
LOG(ERROR) << "sending abort instruction";
ClusterInstructionProto cluster_instruction;
cluster_instruction.mutable_cluster_ctrl_abort();
PushClusterInstruction(cluster_instruction);
}
void ClusterInstruction::MasterSendEagerInstruction(
const ClusterInstructionProto& cluster_instruction) {
CHECK(cluster_instruction.has_eager_instruction());
......
......@@ -23,6 +23,7 @@ namespace oneflow {
struct ClusterInstruction final {
static void MasterSendSessionStart();
static void MasterSendHalt();
static void MasterSendAbort();
static void MasterSendEagerInstruction(const ClusterInstructionProto& cluster_instruction);
static void MasterSendEagerSync();
static void WorkerReceiveInstruction(ClusterInstructionProto* cluster_instruction);
......
......@@ -5,13 +5,15 @@ import "oneflow/core/eager/eager_instruction.proto";
message ClusterCtrlSessionStart {}
message ClusterCtrlHalt {}
message ClusterCtrlAbort {}
message ClusterCtrlEagerSync {}
message ClusterInstructionProto {
oneof instruction_type {
ClusterCtrlSessionStart cluster_ctrl_session_start = 1;
ClusterCtrlHalt cluster_ctrl_halt = 2;
ClusterCtrlHalt cluster_ctrl_halt = 2; // normal exit
eager.EagerInstruction eager_instruction = 3;
ClusterCtrlEagerSync cluster_ctrl_eager_sync = 4;
ClusterCtrlAbort cluster_ctrl_abort = 5; // error exit
}
}
......@@ -66,6 +66,21 @@ atexit.register(oneflow.python.framework.session_context.TryCloseDefaultSession)
atexit.register(oneflow.python.framework.python_interpreter_util.SetShuttingDown)
del atexit
import sys
__original_exit__ = sys.exit
def custom_exit(returncode):
if returncode != 0:
oneflow_api.MasterSendAbort()
__original_exit__(returncode)
sys.exit = custom_exit
del custom_exit
del sys
del absolute_import
del oneflow
del python
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册