未验证 提交 7c07181c 编写于 作者: L leaves-zwx 提交者: GitHub

DeviceId/StreamId/TaskId part 1 (#4226)

* XXId structs and IdUtil

* rm useless header

* update id_util by discuss

* update generate common thrd id and independent thrd id by IdUtil api

* minor update

* use IdUtil to generate task id in UpdateTaskId

* Global<IdUtil>

* emplace CommNetThrdId and TickTockThrdId call

* implement IDMgr MemZoneId related api with IdUtil MemZoneId api

* add GenerateChainId api

* replace IDMgr api with IdUtil

* rm useless header

* revert IDMgr mem_zone_id api

* rm redefinition of GetGpuPhyIdFromMemZoneId

* modify by review comment

* safety modification

* def TaskType hash function

* XXId structs and IdUtil

* rm useless header

* update id_util by discuss

* update generate common thrd id and independent thrd id by IdUtil api

* minor update

* use IdUtil to generate task id in UpdateTaskId

* Global<IdUtil>

* emplace CommNetThrdId and TickTockThrdId call

* implement IDMgr MemZoneId related api with IdUtil MemZoneId api

* add GenerateChainId api

* replace IDMgr api with IdUtil

* rm useless header

* revert IDMgr mem_zone_id api

* rm redefinition of GetGpuPhyIdFromMemZoneId

* modify by review comment

* safety modification

* def TaskType hash function

* rm old test

* fix by self review

* change name

* fix typo and enhance error info

* refactor thread manager

* more check

* rm AllocateCpuThrdIdEvenly

* refactor StreamId and rm IdUtil

* stream index generator

* modify by review

* update stream index

* update id util

* update comm net task node

* add TaskIdGenerator

* update task id generation

* replace gen thrd_in in logical node

* replace GetGpuComputeThrdId in boxing sub task graph builder

* replace h2d and d2h thrd_id in CopyHdTaskNode

* replace h2d and d2h thrd_id in SliceBoxingSubTskGphBuilder

* update id_util header

* CHECK NOTNULL stream index generator

* add chain_id_generator

* rm IdUtil Glabol New

* rm stream type in thread manager

* CHECK_NOTNULL stream_index_generator in logical node

* update id manager

* update id_util

* fix compile errors

* tidy code

* tidy code

* revert format

* mv std::hash<TaskType> to task_node.h

* use unique_ptr to manage thread

* fix typo

* format

* modify by review

* rm chain id generator

* move id serialization to independent implementation

* rm useless friend

* fix compile error under gcc 4.8

* rm IsXxxStreamIndex

* rm deprecated api in IDMgr

* fix bug in CPUStreamIndexGenerator::GenerateComputeStreamIndex

* refine id structs

* refine id struct serialization

* refine task id generator

* refine StreamIndexGeneratorManager

* refine copy task node

* refine collective boxing sub task graph builder

* refine slice boxing sub task graph builder

* refine naive b2p sub task graph builder

* refine logical node

* refine id manager

* refine thread manager

* rm useless comment

* remove magic number

* revise header to be compatible with cpu-only compilation

* more readable

* fix bug

* refine code

* use HashCombine

* replace type of bit shift const value with size_t

* rm ProcessId and make rank as member of DeviceId

* update id serialization with ProcessId update

* make type definition local namespace

* rm ProcessId in task graph

* update DeviceId usage in logical node

* update DeviceId usage in id manager

* update rank usage in ThreadMgr

* minor change

* detail modification

* tidy header

* tidy header
Co-authored-by: Noneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
上级 fb3a7790
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_COMMON_ID_UTIL_H_
#define ONEFLOW_CORE_COMMON_ID_UTIL_H_
#include "oneflow/core/common/util.h"
#include "oneflow/core/common/device_type.pb.h"
namespace oneflow {
// TaskId encode (may be extended to 128 bit in future)
// | rank | device_type | device_index | |
// | ----------- 19 ----------- | ---- 5 ---- | ----- 7 ----- | |
// | DeviceId | stream_index | |
// | ------------------------- 31 --------------------------- | ---- 12 ---- | |
// | StreamId | task_index |
// | -------------------------------- 43 ----------------------------------- | --- 21 --- |
// | TaskId |
// | ----------------------------------- 64 bit ----------------------------------------- |
class DeviceId {
public:
using rank_t = uint32_t;
using device_index_t = uint32_t;
constexpr static size_t kRankBits = 19;
constexpr static size_t kDeviceTypeBits = 5;
constexpr static size_t kDeviceIndexBits = 7;
constexpr static rank_t kMaxRank = (rank_t{1} << kRankBits) - rank_t{1};
constexpr static size_t kMaxDeviceTypeVal = (size_t{1} << kDeviceTypeBits) - size_t{1};
constexpr static device_index_t kMaxDeviceIndex =
(device_index_t{1} << kDeviceIndexBits) - device_index_t{1};
constexpr static device_index_t kCPUDeviceIndex = 0;
DeviceId(rank_t rank, DeviceType device_type, device_index_t device_index)
: rank_(rank), device_type_(device_type), device_index_(device_index) {
CHECK_LE(rank, kMaxRank);
CHECK_LE(static_cast<size_t>(device_type), kMaxDeviceTypeVal);
CHECK_LE(device_index, kMaxDeviceIndex);
}
rank_t rank() const { return rank_; }
DeviceType device_type() const { return device_type_; }
device_index_t device_index() const { return device_index_; }
bool operator==(const DeviceId& rhs) const {
return rank_ == rhs.rank_ && device_type_ == rhs.device_type_
&& device_index_ == rhs.device_index_;
}
bool operator!=(const DeviceId& rhs) const { return !(*this == rhs); }
size_t hash() const {
size_t hash = std::hash<rank_t>{}(rank_);
HashCombine(&hash, std::hash<size_t>{}(static_cast<size_t>(device_type_)));
HashCombine(&hash, std::hash<device_index_t>{}(device_index_));
return hash;
}
private:
rank_t rank_;
DeviceType device_type_;
device_index_t device_index_;
};
class StreamId {
public:
using stream_index_t = uint32_t;
constexpr static size_t kStreamIndexBits = 12;
constexpr static stream_index_t kMaxStreamIndex =
(stream_index_t{1} << kStreamIndexBits) - stream_index_t{1};
StreamId(const DeviceId& device_id, stream_index_t stream_index)
: device_id_(device_id), stream_index_(stream_index) {
CHECK_LE(stream_index, kMaxStreamIndex);
}
const DeviceId& device_id() const { return device_id_; }
stream_index_t stream_index() const { return stream_index_; }
bool operator==(const StreamId& rhs) const {
return device_id_ == rhs.device_id_ && stream_index_ == rhs.stream_index_;
}
bool operator!=(const StreamId& rhs) const { return !(*this == rhs); }
size_t hash() const {
size_t hash = device_id_.hash();
HashCombine(&hash, std::hash<stream_index_t>{}(stream_index_));
return hash;
}
private:
DeviceId device_id_;
stream_index_t stream_index_;
};
class TaskId {
public:
using task_index_t = uint32_t;
const static size_t kTaskIndexBits = 21;
constexpr static task_index_t kMaxTaskIndex =
(task_index_t{1} << kTaskIndexBits) - task_index_t{1};
TaskId(const StreamId& stream_id, task_index_t task_index)
: stream_id_(stream_id), task_index_(task_index) {
CHECK_LE(task_index_, kMaxTaskIndex);
}
const StreamId& stream_id() const { return stream_id_; }
task_index_t task_index() const { return task_index_; }
bool operator==(const TaskId& rhs) const {
return stream_id_ == rhs.stream_id_ && task_index_ == rhs.task_index_;
}
bool operator!=(const TaskId& rhs) const { return !(*this == rhs); }
size_t hash() const {
size_t hash = stream_id_.hash();
HashCombine(&hash, std::hash<task_index_t>{}(task_index_));
return hash;
}
private:
StreamId stream_id_;
task_index_t task_index_;
};
} // namespace oneflow
namespace std {
template<>
struct hash<oneflow::DeviceId> {
size_t operator()(const oneflow::DeviceId& device_id) const { return device_id.hash(); }
};
template<>
struct hash<oneflow::StreamId> {
size_t operator()(const oneflow::StreamId& stream_id) const { return stream_id.hash(); }
};
template<>
struct hash<oneflow::TaskId> {
size_t operator()(const oneflow::TaskId& task_id) const { return task_id.hash(); }
};
} // namespace std
#endif // ONEFLOW_CORE_COMMON_ID_UTIL_H_
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/device/cpu_stream_index.h"
#include "oneflow/core/job/resource_desc.h"
namespace oneflow {
CPUStreamIndexGenerator::CPUStreamIndexGenerator()
: next_stream_index_(0), compute_stream_index_counter_(0) {
compute_stream_index_begin_ = next_stream_index_;
// TODO: It will not be specified by cpu_device_num in future
compute_stream_num_ = Global<ResourceDesc, ForSession>::Get()->CpuDeviceNum();
next_stream_index_ += compute_stream_num_;
comm_net_stream_index_ = next_stream_index_;
next_stream_index_++;
tick_tock_stream_index_ = next_stream_index_;
next_stream_index_++;
}
StreamIndexGenerator::stream_index_t CPUStreamIndexGenerator::GenerateComputeStreamIndex() {
return compute_stream_index_begin_ + (compute_stream_index_counter_++ % compute_stream_num_);
}
StreamIndexGenerator::stream_index_t CPUStreamIndexGenerator::GenerateCommNetStreamIndex() {
return comm_net_stream_index_;
}
StreamIndexGenerator::stream_index_t CPUStreamIndexGenerator::GenerateTickTockStreamIndex() {
return tick_tock_stream_index_;
}
StreamIndexGenerator::stream_index_t CPUStreamIndexGenerator::GenerateIndependentTaskStreamIndex(
TaskType task_type) {
auto max_num_iter = task_type2max_stream_num_.end();
if (IsClassRegistered<int32_t, IndependentThreadNum4TaskType>(task_type)) {
std::unique_ptr<IndependentThreadNum4TaskType> thread_num_ptr(
NewObj<int32_t, IndependentThreadNum4TaskType>(task_type));
const size_t max_num = static_cast<size_t>(*thread_num_ptr.get());
max_num_iter = task_type2max_stream_num_.find(task_type);
if (max_num_iter == task_type2max_stream_num_.end()) {
task_type2max_stream_num_.emplace(task_type, max_num);
CHECK(task_type2allocated_stream_index_vec_.emplace(task_type, std::vector<stream_index_t>{})
.second);
} else {
CHECK_EQ(max_num_iter->second, max_num);
CHECK(task_type2allocated_stream_index_vec_.find(task_type)
!= task_type2allocated_stream_index_vec_.end());
}
}
stream_index_t index = next_stream_index_;
if (max_num_iter != task_type2max_stream_num_.end()) {
auto& allocated_stream_index_vec = task_type2allocated_stream_index_vec_[task_type];
if (allocated_stream_index_vec.size() < max_num_iter->second) {
allocated_stream_index_vec.push_back(index);
next_stream_index_++;
} else {
CHECK_EQ(allocated_stream_index_vec.size(), max_num_iter->second);
auto& next = task_type2allocated_stream_index_vec_index_[task_type];
index = allocated_stream_index_vec[next++];
if (next >= allocated_stream_index_vec.size()) { next %= allocated_stream_index_vec.size(); }
}
} else {
next_stream_index_++;
}
return index;
}
REGISTER_STREAM_INDEX_GENERATOR(DeviceType::kCPU, CPUStreamIndexGenerator);
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_DEVICE_CPU_STREAM_INDEX_H_
#define ONEFLOW_CORE_DEVICE_CPU_STREAM_INDEX_H_
#include "oneflow/core/device/stream_index.h"
#include "oneflow/core/graph/task_node.h"
namespace oneflow {
class CPUStreamIndexGenerator final : public StreamIndexGenerator {
public:
CPUStreamIndexGenerator();
OF_DISALLOW_COPY_AND_MOVE(CPUStreamIndexGenerator);
~CPUStreamIndexGenerator() = default;
stream_index_t GenerateComputeStreamIndex() override;
stream_index_t GenerateH2DStreamIndex() override { UNIMPLEMENTED(); }
stream_index_t GenerateD2HStreamIndex() override { UNIMPLEMENTED(); }
stream_index_t GenerateCommNetStreamIndex();
stream_index_t GenerateTickTockStreamIndex();
stream_index_t GenerateIndependentTaskStreamIndex(TaskType task_type);
private:
stream_index_t next_stream_index_;
stream_index_t compute_stream_index_begin_;
stream_index_t compute_stream_num_;
stream_index_t comm_net_stream_index_;
stream_index_t tick_tock_stream_index_;
// for GenerateComputeStreamIndex
stream_index_t compute_stream_index_counter_;
// for GenerateIndependentStreamIndex
HashMap<TaskType, size_t> task_type2max_stream_num_;
HashMap<TaskType, std::vector<stream_index_t>> task_type2allocated_stream_index_vec_;
HashMap<TaskType, size_t> task_type2allocated_stream_index_vec_index_;
};
} // namespace oneflow
#endif // ONEFLOW_CORE_DEVICE_CPU_STREAM_INDEX_H_
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/device/cuda_stream_index.h"
namespace oneflow {
REGISTER_STREAM_INDEX_GENERATOR(DeviceType::kGPU, CudaStreamIndexGenerator);
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_DEVICE_CUDA_STREAM_INDEX_H_
#define ONEFLOW_CORE_DEVICE_CUDA_STREAM_INDEX_H_
#include "oneflow/core/device/stream_index.h"
namespace oneflow {
class CudaStreamIndexGenerator final : public StreamIndexGenerator {
public:
stream_index_t GenerateComputeStreamIndex() override { return kCompute; }
stream_index_t GenerateH2DStreamIndex() override { return kH2D; }
stream_index_t GenerateD2HStreamIndex() override { return kD2H; }
stream_index_t GenerateMixStreamIndex() { return kMix; }
stream_index_t GenerateNcclStreamIndex() { return kNccl; }
stream_index_t GenerateDecodeH2DStreamIndex() { return kDecodeH2D; }
private:
static const stream_index_t kCompute = 0;
static const stream_index_t kH2D = 1;
static const stream_index_t kD2H = 2;
static const stream_index_t kMix = 3;
static const stream_index_t kNccl = 4;
static const stream_index_t kDecodeH2D = 5;
};
} // namespace oneflow
#endif // ONEFLOW_CORE_DEVICE_CUDA_STREAM_INDEX_H_
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_DEVICE_STREAM_INDEX_H_
#define ONEFLOW_CORE_DEVICE_STREAM_INDEX_H_
#include "oneflow/core/common/util.h"
#include "oneflow/core/common/auto_registration_factory.h"
#include "oneflow/core/common/id_util.h"
namespace oneflow {
class StreamIndexGenerator {
public:
using stream_index_t = StreamId::stream_index_t;
virtual stream_index_t GenerateComputeStreamIndex() = 0;
virtual stream_index_t GenerateH2DStreamIndex() = 0;
virtual stream_index_t GenerateD2HStreamIndex() = 0;
};
class StreamIndexGeneratorManager final {
public:
StreamIndexGeneratorManager() = default;
OF_DISALLOW_COPY_AND_MOVE(StreamIndexGeneratorManager);
~StreamIndexGeneratorManager() = default;
StreamIndexGenerator* GetGenerator(const DeviceId& device_id) {
auto iter = generators_.find(device_id);
if (iter == generators_.end()) {
auto* generator = NewObj<int, StreamIndexGenerator>(device_id.device_type());
generators_[device_id].reset(generator);
return generator;
}
return iter->second.get();
}
private:
HashMap<DeviceId, std::unique_ptr<StreamIndexGenerator>> generators_;
};
#define REGISTER_STREAM_INDEX_GENERATOR(device_type_v, stream_index_generator_class) \
REGISTER_CLASS(int, device_type_v, StreamIndexGenerator, stream_index_generator_class)
} // namespace oneflow
#endif // ONEFLOW_CORE_DEVICE_STREAM_INDEX_H_
......@@ -21,6 +21,9 @@ limitations under the License.
#include "oneflow/core/graph/slice_boxing_task_node.h"
#include "oneflow/core/graph/collective_boxing_pack_task_node.h"
#include "oneflow/core/graph/collective_boxing_unpack_task_node.h"
#include "oneflow/core/common/id_util.h"
#include "oneflow/core/graph/id_serialization.h"
#include "oneflow/core/device/cuda_stream_index.h"
#ifdef WITH_CUDA
#include <nccl.h>
#endif
......@@ -62,8 +65,14 @@ void NcclInitCollectiveNode(CollectiveBoxingGenericTaskNode* node,
rank_desc->set_rank(parallel_id);
const int64_t machine_id = CHECK_JUST(parallel_desc.MachineId4ParallelId(parallel_id));
const int64_t device_id = CHECK_JUST(parallel_desc.DeviceId4ParallelId(parallel_id));
const int64_t thrd_id = Global<IDMgr>::Get()->GetGpuNcclThrdId(device_id);
const int64_t device_index = CHECK_JUST(parallel_desc.DeviceId4ParallelId(parallel_id));
DeviceId device_id{static_cast<DeviceId::rank_t>(machine_id), DeviceType::kGPU,
static_cast<DeviceId::device_index_t>(device_index)};
auto* stream_index_generator = dynamic_cast<CudaStreamIndexGenerator*>(
Global<IDMgr>::Get()->GetStreamIndexGeneratorManager()->GetGenerator(device_id));
CHECK_NOTNULL(stream_index_generator);
auto stream_index = stream_index_generator->GenerateNcclStreamIndex();
const int64_t thrd_id = SerializeStreamIdToInt64(StreamId{device_id, stream_index});
node->Init(machine_id, thrd_id, NewAreaId(), op_conf);
}
......@@ -383,8 +392,13 @@ class NcclCollectiveBoxingAll2AllSubTskGphBuilder final : public SubTskGphBuilde
const std::string op_name = "System-Boxing-NcclCollectiveBoxingAll2All-" + NewUniqueId();
FOR_RANGE(int64_t, i, 0, in_parallel_desc.parallel_num()) {
const int64_t machine_id = CHECK_JUST(in_parallel_desc.MachineId4ParallelId(i));
const int64_t device_id = CHECK_JUST(in_parallel_desc.DeviceId4ParallelId(i));
const int64_t thrd_id = Global<IDMgr>::Get()->GetGpuComputeThrdId(device_id);
const int64_t device_index = CHECK_JUST(in_parallel_desc.DeviceId4ParallelId(i));
DeviceId device_id{static_cast<DeviceId::rank_t>(machine_id), DeviceType::kGPU,
static_cast<DeviceId::device_index_t>(device_index)};
auto* stream_index_generator =
Global<IDMgr>::Get()->GetStreamIndexGeneratorManager()->GetGenerator(device_id);
auto stream_index = stream_index_generator->GenerateComputeStreamIndex();
const int64_t thrd_id = SerializeStreamIdToInt64(StreamId{device_id, stream_index});
TaskNode* in_node = sorted_in_tasks.at(i);
CollectiveBoxingPackTaskNode* pack_node =
ctx->task_graph()->NewNode<CollectiveBoxingPackTaskNode>();
......
......@@ -16,6 +16,9 @@ limitations under the License.
#include "oneflow/core/graph/boxing/naive_b2p_sub_task_graph_builder.h"
#include "oneflow/core/graph/boxing/sub_task_graph_builder_util.h"
#include "oneflow/core/graph/boxing_zeros_task_node.h"
#include "oneflow/core/common/id_util.h"
#include "oneflow/core/graph/id_serialization.h"
#include "oneflow/core/device/stream_index.h"
namespace oneflow {
......@@ -57,7 +60,12 @@ Maybe<SubTskGphBuilderStatus> NaiveB2PSubTskGphBuilder::Build(
int64_t thrd_id;
if (out_parallel_desc.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
thrd_id = Global<IDMgr>::Get()->GetGpuComputeThrdId(out_dev_phy_id);
DeviceId device_id{static_cast<DeviceId::rank_t>(out_machine_id), DeviceType::kGPU,
static_cast<DeviceId::device_index_t>(out_dev_phy_id)};
auto* stream_index_generator =
Global<IDMgr>::Get()->GetStreamIndexGeneratorManager()->GetGenerator(device_id);
auto stream_index = stream_index_generator->GenerateComputeStreamIndex();
thrd_id = SerializeStreamIdToInt64(StreamId{device_id, stream_index});
#else
UNIMPLEMENTED();
#endif
......
......@@ -18,6 +18,12 @@ limitations under the License.
#include "oneflow/core/common/balanced_splitter.h"
#include "oneflow/core/graph/slice_boxing_task_node.h"
#include "oneflow/core/graph/boxing/sub_task_graph_builder_util.h"
#include "oneflow/core/common/id_util.h"
#include "oneflow/core/graph/id_serialization.h"
#include "oneflow/core/device/cpu_stream_index.h"
#ifdef WITH_CUDA
#include "oneflow/core/device/cuda_stream_index.h"
#endif
namespace oneflow {
......@@ -84,18 +90,31 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
|| SubTskGphBuilderUtil::IsBoxingB2S(in_sbp_parallel, out_sbp_parallel))) {
return Error::BoxingNotSupportedError();
}
const auto GetBoxingGpuThrdId = [](const int64_t dev_id, CudaWorkType work_type) -> int64_t {
const auto GetBoxingGpuThrdId = [](int64_t machine_id, int64_t dev_id,
CudaWorkType work_type) -> int64_t {
int64_t thrd_id = -1;
#ifdef WITH_CUDA
DeviceId device_id{static_cast<DeviceId::rank_t>(machine_id), DeviceType::kGPU,
static_cast<DeviceId::device_index_t>(dev_id)};
auto* generator = dynamic_cast<CudaStreamIndexGenerator*>(
Global<IDMgr>::Get()->GetStreamIndexGeneratorManager()->GetGenerator(device_id));
CHECK_NOTNULL(generator);
StreamId::stream_index_t stream_index = 0;
if (work_type == CudaWorkType::kCopyH2D) {
return Global<IDMgr>::Get()->GetGpuH2DThrdId(dev_id);
stream_index = generator->GenerateH2DStreamIndex();
} else if (work_type == CudaWorkType::kCopyD2H) {
return Global<IDMgr>::Get()->GetGpuD2HThrdId(dev_id);
stream_index = generator->GenerateD2HStreamIndex();
} else if (work_type == CudaWorkType::kMix) {
stream_index = generator->GenerateMixStreamIndex();
} else {
return Global<IDMgr>::Get()->GetGpuMixThrdId(dev_id);
UNIMPLEMENTED();
}
thrd_id = SerializeStreamIdToInt64(StreamId{device_id, stream_index});
#else
UNIMPLEMENTED();
#endif
return thrd_id;
};
const auto NewEdge = [&ctx]() -> TaskEdge* { return ctx->task_graph()->NewEdge(); };
......@@ -110,8 +129,8 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
thrd_id = Global<IDMgr>::Get()->PickCpuThrdIdEvenly(machine_id);
} else if (pd.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
thrd_id = GetBoxingGpuThrdId(CHECK_JUST(pd.DeviceId4ParallelId(parallel_id)),
CudaWorkType::kCopyH2D);
int64_t dev_id = CHECK_JUST(pd.DeviceId4ParallelId(parallel_id));
thrd_id = GetBoxingGpuThrdId(machine_id, dev_id, CudaWorkType::kCopyH2D);
#else
UNIMPLEMENTED();
#endif
......@@ -131,7 +150,8 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
thrd_id = Global<IDMgr>::Get()->PickCpuThrdIdEvenly(src_node->machine_id());
} else if (src_node->device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
thrd_id = GetBoxingGpuThrdId(src_node->GpuPhyId(), CudaWorkType::kCopyD2H);
thrd_id =
GetBoxingGpuThrdId(src_node->machine_id(), src_node->GpuPhyId(), CudaWorkType::kCopyD2H);
#else
UNIMPLEMENTED();
#endif
......@@ -256,9 +276,9 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
local_concat_thrd_id = Global<IDMgr>::Get()->PickCpuThrdIdEvenly(in_machine_id);
} else if (in_pd.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
local_concat_thrd_id = GetBoxingGpuThrdId(
in_nodes.at(in_parallel_ids.at(out_id % in_parallel_ids.size()))->GpuPhyId(),
CudaWorkType::kCopyD2H);
TaskNode* node = in_nodes.at(in_parallel_ids.at(out_id % in_parallel_ids.size()));
local_concat_thrd_id =
GetBoxingGpuThrdId(node->machine_id(), node->GpuPhyId(), CudaWorkType::kCopyD2H);
#else
UNIMPLEMENTED();
#endif
......@@ -280,67 +300,68 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
out_nodes->push_back(out_node);
}
};
const auto BuildSubTaskGphP2S =
[&ctx, &lbi, &CreateBoxingNode121, &CreateBoxingNodeToHost, &GetBoxingGpuThrdId, &NewEdge](
const ParallelDesc& in_pd, const ParallelDesc& out_pd, const SbpParallel& in_sbp,
const SbpParallel& out_sbp, const BlobDesc& blob_desc,
const std::vector<TaskNode*>& in_nodes, std::vector<TaskNode*>* out_nodes) {
CHECK(SubTskGphBuilderUtil::IsBoxingP2S(in_sbp, out_sbp));
const TensorSliceView in_slice =
SubTskGphBuilderUtil::GetBroadcastTensorSliceView(blob_desc);
const std::vector<TensorSliceView> out_slices =
SubTskGphBuilderUtil::GetTensorSliceView(out_pd.parallel_num(), out_sbp, blob_desc);
CHECK(!ContainsEmptySlice(out_slices));
HashMap<int64_t, std::vector<int64_t>> machine_id2in_parallel_ids;
GroupParallelIdByMachine(in_pd, &machine_id2in_parallel_ids);
FOR_RANGE(int64_t, out_id, 0, out_pd.parallel_num()) {
const TensorSliceView& out_slice = out_slices.at(out_id);
SliceBoxingTaskNode* out_node =
CreateBoxingNode121(out_pd, out_id, out_slice, kSliceBoxingTaskModeAdd);
for (const auto& pair : machine_id2in_parallel_ids) {
const int64_t in_machine_id = pair.first;
const std::vector<int64_t>& in_parallel_ids = pair.second;
if (out_node->machine_id() == in_machine_id) {
for (const int64_t in_id : in_parallel_ids) {
TaskNode* in_node = in_nodes.at(in_id);
if (SubTskGphBuilderUtil::IsOnSameGPU(in_node, out_node)) {
out_node->ConnectToSrcNodeWithSlice(in_node, NewEdge(), in_slice);
} else if (in_pd.device_type() == DeviceType::kGPU) {
SliceBoxingTaskNode* copy_to_host =
CreateBoxingNodeToHost(in_node, in_slice, out_slice);
out_node->ConnectToSrcNodeWithSlice(copy_to_host, NewEdge(), out_slice);
} else {
out_node->ConnectToSrcNodeWithSlice(in_node, NewEdge(), in_slice);
}
}
const auto BuildSubTaskGphP2S = [&ctx, &lbi, &CreateBoxingNode121, &CreateBoxingNodeToHost,
&GetBoxingGpuThrdId,
&NewEdge](const ParallelDesc& in_pd, const ParallelDesc& out_pd,
const SbpParallel& in_sbp, const SbpParallel& out_sbp,
const BlobDesc& blob_desc,
const std::vector<TaskNode*>& in_nodes,
std::vector<TaskNode*>* out_nodes) {
CHECK(SubTskGphBuilderUtil::IsBoxingP2S(in_sbp, out_sbp));
const TensorSliceView in_slice = SubTskGphBuilderUtil::GetBroadcastTensorSliceView(blob_desc);
const std::vector<TensorSliceView> out_slices =
SubTskGphBuilderUtil::GetTensorSliceView(out_pd.parallel_num(), out_sbp, blob_desc);
CHECK(!ContainsEmptySlice(out_slices));
HashMap<int64_t, std::vector<int64_t>> machine_id2in_parallel_ids;
GroupParallelIdByMachine(in_pd, &machine_id2in_parallel_ids);
FOR_RANGE(int64_t, out_id, 0, out_pd.parallel_num()) {
const TensorSliceView& out_slice = out_slices.at(out_id);
SliceBoxingTaskNode* out_node =
CreateBoxingNode121(out_pd, out_id, out_slice, kSliceBoxingTaskModeAdd);
for (const auto& pair : machine_id2in_parallel_ids) {
const int64_t in_machine_id = pair.first;
const std::vector<int64_t>& in_parallel_ids = pair.second;
if (out_node->machine_id() == in_machine_id) {
for (const int64_t in_id : in_parallel_ids) {
TaskNode* in_node = in_nodes.at(in_id);
if (SubTskGphBuilderUtil::IsOnSameGPU(in_node, out_node)) {
out_node->ConnectToSrcNodeWithSlice(in_node, NewEdge(), in_slice);
} else if (in_pd.device_type() == DeviceType::kGPU) {
SliceBoxingTaskNode* copy_to_host =
CreateBoxingNodeToHost(in_node, in_slice, out_slice);
out_node->ConnectToSrcNodeWithSlice(copy_to_host, NewEdge(), out_slice);
} else {
auto* local_add_node = ctx->task_graph()->NewNode<SliceBoxingTaskNode>();
int64_t local_add_thrd_id = -1;
if (in_pd.device_type() == DeviceType::kCPU) {
local_add_thrd_id = Global<IDMgr>::Get()->PickCpuThrdIdEvenly(in_machine_id);
} else if (in_pd.device_type() == DeviceType::kGPU) {
out_node->ConnectToSrcNodeWithSlice(in_node, NewEdge(), in_slice);
}
}
} else {
auto* local_add_node = ctx->task_graph()->NewNode<SliceBoxingTaskNode>();
int64_t local_add_thrd_id = -1;
if (in_pd.device_type() == DeviceType::kCPU) {
local_add_thrd_id = Global<IDMgr>::Get()->PickCpuThrdIdEvenly(in_machine_id);
} else if (in_pd.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
local_add_thrd_id = GetBoxingGpuThrdId(
in_nodes.at(in_parallel_ids.at(out_id % in_parallel_ids.size()))->GpuPhyId(),
CudaWorkType::kCopyD2H);
TaskNode* node = in_nodes.at(in_parallel_ids.at(out_id % in_parallel_ids.size()));
local_add_thrd_id =
GetBoxingGpuThrdId(node->machine_id(), node->GpuPhyId(), CudaWorkType::kCopyD2H);
#else
UNIMPLEMENTED();
UNIMPLEMENTED();
#endif
}
local_add_node->Init(lbi, out_slice, kSliceBoxingTaskModeAdd, in_machine_id,
local_add_thrd_id, Global<IDMgr>::Get()->CpuMemZoneId());
for (const int64_t in_id : in_parallel_ids) {
local_add_node->ConnectToSrcNodeWithSlice(in_nodes.at(in_id), NewEdge(), in_slice);
}
TaskNode* local_add_proxy_node =
ctx->GetProxyNode(local_add_node, Global<IDMgr>::Get()->CpuMemZoneId(),
out_node->machine_id(), Global<IDMgr>::Get()->CpuMemZoneId());
out_node->ConnectToSrcNodeWithSlice(local_add_proxy_node, NewEdge(), out_slice);
}
}
out_nodes->push_back(out_node);
local_add_node->Init(lbi, out_slice, kSliceBoxingTaskModeAdd, in_machine_id,
local_add_thrd_id, Global<IDMgr>::Get()->CpuMemZoneId());
for (const int64_t in_id : in_parallel_ids) {
local_add_node->ConnectToSrcNodeWithSlice(in_nodes.at(in_id), NewEdge(), in_slice);
}
TaskNode* local_add_proxy_node =
ctx->GetProxyNode(local_add_node, Global<IDMgr>::Get()->CpuMemZoneId(),
out_node->machine_id(), Global<IDMgr>::Get()->CpuMemZoneId());
out_node->ConnectToSrcNodeWithSlice(local_add_proxy_node, NewEdge(), out_slice);
}
};
}
out_nodes->push_back(out_node);
}
};
const auto BuildSubTaskGphP2B = [&ctx, &lbi, &GetBoxingGpuThrdId, &NewEdge](
const ParallelDesc& in_pd, const ParallelDesc& out_pd,
......@@ -368,8 +389,9 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
local_add_thrd_id = Global<IDMgr>::Get()->PickCpuThrdIdEvenly(in_machine_id);
} else if (in_pd.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
local_add_thrd_id = GetBoxingGpuThrdId(in_nodes.at(in_ids_on_machine.front())->GpuPhyId(),
CudaWorkType::kCopyH2D);
TaskNode* node = in_nodes.at(in_ids_on_machine.front());
local_add_thrd_id =
GetBoxingGpuThrdId(node->machine_id(), node->GpuPhyId(), CudaWorkType::kCopyH2D);
#else
UNIMPLEMENTED();
#endif
......
......@@ -15,8 +15,10 @@ limitations under the License.
*/
#include "oneflow/core/framework/to_string.h"
#include "oneflow/core/graph/copy_task_node.h"
#include "oneflow/core/job/thrd_id_generator.h"
#include "oneflow/core/operator/operator.h"
#include "oneflow/core/common/id_util.h"
#include "oneflow/core/graph/id_serialization.h"
#include "oneflow/core/device/cpu_stream_index.h"
namespace oneflow {
......@@ -58,13 +60,19 @@ void CopyTaskNode::InferProducedDataRegstTimeShape() { NaiveInferProducedDataReg
void CopyHdTaskNode::Init(CopyHdOpConf::Type copy_type, int64_t machine_id, int64_t dev_phy_id) {
copy_type_ = copy_type;
set_machine_id(machine_id);
DeviceId device_id{static_cast<DeviceId::rank_t>(machine_id), DeviceType::kGPU,
static_cast<DeviceId::device_index_t>(dev_phy_id)};
auto* stream_index_generator =
Global<IDMgr>::Get()->GetStreamIndexGeneratorManager()->GetGenerator(device_id);
StreamId::stream_index_t stream_index = 0;
if (copy_type == CopyHdOpConf::H2D) {
set_thrd_id(Global<IDMgr>::Get()->GetGpuH2DThrdId(dev_phy_id));
stream_index = stream_index_generator->GenerateH2DStreamIndex();
} else if (copy_type == CopyHdOpConf::D2H) {
set_thrd_id(Global<IDMgr>::Get()->GetGpuD2HThrdId(dev_phy_id));
stream_index = stream_index_generator->GenerateD2HStreamIndex();
} else {
UNIMPLEMENTED();
}
set_thrd_id(SerializeStreamIdToInt64(StreamId{device_id, stream_index}));
}
void CopyHdTaskNode::InitProducedRegstMemCase(MemoryCase* mem_case) {
......@@ -92,7 +100,13 @@ OperatorConf CopyHdTaskNode::NewCopyOpConf() {
void CopyCommNetTaskNode::Init(int64_t machine_id) {
set_machine_id(machine_id);
set_thrd_id(Global<IDMgr>::Get()->CommNetThrdId());
DeviceId device_id{static_cast<DeviceId::rank_t>(machine_id), DeviceType::kCPU,
DeviceId::kCPUDeviceIndex};
auto* generator = dynamic_cast<CPUStreamIndexGenerator*>(
Global<IDMgr>::Get()->GetStreamIndexGeneratorManager()->GetGenerator(device_id));
CHECK_NOTNULL(generator);
StreamId stream_id{device_id, generator->GenerateCommNetStreamIndex()};
set_thrd_id(SerializeStreamIdToInt64(stream_id));
}
void CopyCommNetTaskNode::InitProducedRegstMemCase(MemoryCase* mem_case) {
......
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/graph/id_serialization.h"
#include <climits>
namespace oneflow {
// TaskId encode (may be extended to 128 bit in future)
// | rank | device_type | device_index | |
// | ----------- 19 ----------- | ---- 5 ---- | ----- 7 ----- | |
// | DeviceId | stream_index | |
// | ------------------------- 31 --------------------------- | ---- 12 ---- | |
// | StreamId | task_index |
// | -------------------------------- 43 ----------------------------------- | --- 21 --- |
// | TaskId |
// | ----------------------------------- 64 bit ----------------------------------------- |
namespace {
constexpr size_t kInt64Bits = sizeof(int64_t) * CHAR_BIT;
} // namespace
namespace stream_id_const {
constexpr size_t kDeviceIndexShift = StreamId::kStreamIndexBits;
constexpr size_t kDeviceTypeShift = kDeviceIndexShift + DeviceId::kDeviceIndexBits;
constexpr size_t kRankShift = kDeviceTypeShift + DeviceId::kDeviceTypeBits;
static_assert(kInt64Bits == kRankShift + DeviceId::kRankBits + TaskId::kTaskIndexBits, "");
constexpr int64_t kStreamIndexInt64Mask = (int64_t{1} << StreamId::kStreamIndexBits) - 1;
constexpr int64_t kDeviceIndexInt64Mask = ((int64_t{1} << DeviceId::kDeviceIndexBits) - 1)
<< kDeviceIndexShift;
constexpr int64_t kDeviceTypeInt64Mask = ((int64_t{1} << DeviceId::kDeviceTypeBits) - 1)
<< kDeviceTypeShift;
constexpr int64_t kRankInt64Mask = ((int64_t{1} << DeviceId::kRankBits) - 1) << kRankShift;
} // namespace stream_id_const
int64_t SerializeStreamIdToInt64(const StreamId& stream_id) {
int64_t id = static_cast<int64_t>(stream_id.stream_index());
id |= static_cast<int64_t>(stream_id.device_id().device_index())
<< stream_id_const::kDeviceIndexShift;
id |= static_cast<int64_t>(stream_id.device_id().device_type())
<< stream_id_const::kDeviceTypeShift;
id |= static_cast<int64_t>(stream_id.device_id().rank()) << stream_id_const::kRankShift;
return id;
}
StreamId DeserializeStreamIdFromInt64(int64_t stream_id_val) {
int64_t rank = (stream_id_val & stream_id_const::kRankInt64Mask) >> stream_id_const::kRankShift;
int64_t device_type =
(stream_id_val & stream_id_const::kDeviceTypeInt64Mask) >> stream_id_const::kDeviceTypeShift;
int64_t device_index = (stream_id_val & stream_id_const::kDeviceIndexInt64Mask)
>> stream_id_const::kDeviceIndexShift;
int64_t stream_index = (stream_id_val & stream_id_const::kStreamIndexInt64Mask);
DeviceId device_id{static_cast<DeviceId::rank_t>(rank), static_cast<DeviceType>(device_type),
static_cast<DeviceId::device_index_t>(device_index)};
return StreamId{device_id, static_cast<StreamId::stream_index_t>(stream_index)};
}
namespace task_id_const {
constexpr size_t kStreamIndexShift = TaskId::kTaskIndexBits;
constexpr size_t kDeviceIndexShift = kStreamIndexShift + StreamId::kStreamIndexBits;
constexpr size_t kDeviceTypeShift = kDeviceIndexShift + DeviceId::kDeviceIndexBits;
constexpr size_t kRankShift = kDeviceTypeShift + DeviceId::kDeviceTypeBits;
static_assert(kInt64Bits == kRankShift + DeviceId::kRankBits, "");
constexpr int64_t kTaskIndexInt64Mask = (int64_t{1} << TaskId::kTaskIndexBits) - 1;
constexpr int64_t kStreamIndexInt64Mask = ((int64_t{1} << StreamId::kStreamIndexBits) - 1)
<< kStreamIndexShift;
constexpr int64_t kDeviceIndexInt64Mask = ((int64_t{1} << DeviceId::kDeviceIndexBits) - 1)
<< kDeviceIndexShift;
constexpr int64_t kDeviceTypeInt64Mask = ((int64_t{1} << DeviceId::kDeviceTypeBits) - 1)
<< kDeviceTypeShift;
constexpr int64_t kRankInt64Mask = ((int64_t{1} << DeviceId::kRankBits) - 1) << kRankShift;
} // namespace task_id_const
int64_t SerializeTaskIdToInt64(const TaskId& task_id) {
int64_t id = static_cast<int64_t>(task_id.task_index());
id |= static_cast<int64_t>(task_id.stream_id().stream_index())
<< task_id_const::kStreamIndexShift;
id |= static_cast<int64_t>(task_id.stream_id().device_id().device_index())
<< task_id_const::kDeviceIndexShift;
id |= static_cast<int64_t>(task_id.stream_id().device_id().device_type())
<< task_id_const::kDeviceTypeShift;
id |= static_cast<int64_t>(task_id.stream_id().device_id().rank()) << task_id_const::kRankShift;
return id;
}
TaskId DeserializeTaskIdFromInt64(int64_t task_id_val) {
int64_t rank = (task_id_val & task_id_const::kRankInt64Mask) >> task_id_const::kRankShift;
int64_t device_type =
(task_id_val & task_id_const::kDeviceTypeInt64Mask) >> task_id_const::kDeviceTypeShift;
int64_t device_index =
(task_id_val & task_id_const::kDeviceIndexInt64Mask) >> task_id_const::kDeviceIndexShift;
int64_t stream_index =
(task_id_val & task_id_const::kStreamIndexInt64Mask) >> task_id_const::kStreamIndexShift;
int64_t task_index = task_id_val & task_id_const::kTaskIndexInt64Mask;
DeviceId device_id{static_cast<DeviceId::rank_t>(rank), static_cast<DeviceType>(device_type),
static_cast<DeviceId::device_index_t>(device_index)};
StreamId stream_id{device_id, static_cast<StreamId::stream_index_t>(stream_index)};
return TaskId{stream_id, static_cast<TaskId::task_index_t>(task_index)};
}
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_GRAPH_ID_SERIALIZATION_H_
#define ONEFLOW_CORE_GRAPH_ID_SERIALIZATION_H_
#include "oneflow/core/common/id_util.h"
namespace oneflow {
int64_t SerializeStreamIdToInt64(const StreamId&);
StreamId DeserializeStreamIdFromInt64(int64_t);
int64_t SerializeTaskIdToInt64(const TaskId&);
TaskId DeserializeTaskIdFromInt64(int64_t);
} // namespace oneflow
#endif // ONEFLOW_CORE_GRAPH_ID_SERIALIZATION_H_
......@@ -69,9 +69,7 @@ class LogicalNode : public Node<LogicalNode, LogicalEdge> {
// util
virtual std::string TypeName() const = 0;
std::string VisualStr() const;
void GenSortedCompTaskNodes(std::function<int64_t(const TaskNode*)> AllocateCpuThrdIdEvenly,
std::vector<std::pair<int64_t, CompTaskNode*>>* nodes,
std::function<void(CompTaskNode*)>) const;
void GenSortedCompTaskNodes(std::function<void(CompTaskNode*)>) const;
// other
virtual int64_t GetAreaId() const = 0;
......@@ -96,8 +94,7 @@ class LogicalNode : public Node<LogicalNode, LogicalEdge> {
const std::vector<CompTaskNode*>& sorted_src_comp_tasks, \
const std::vector<CompTaskNode*>& sorted_dst_comp_tasks, \
std::function<TaskNode**(CompTaskNode * src, int64_t machine_id, int32_t mem_zone_id)> \
MutBufTask, \
std::function<int64_t(const TaskNode*)> AllocateCpuThrdIdEvenly)
MutBufTask)
class TaskGraph;
using BldSubTskGphMthd = void(TaskGraph::*) BLD_SUB_TSK_GPH_MTHD_ARGS();
......
......@@ -17,7 +17,6 @@ limitations under the License.
#include "oneflow/core/common/util.h"
#include "oneflow/core/graph/inplace_lbi_graph.h"
#include "oneflow/core/register/runtime_blob_desc.h"
#include "oneflow/core/job/thrd_id_generator.h"
#include "oneflow/core/job/global_for.h"
#include "oneflow/core/operator/variable_op.h"
#include "oneflow/core/graph/op_graph.h"
......@@ -250,34 +249,20 @@ TaskGraph::TaskGraph(std::unique_ptr<const LogicalGraph>&& logical_gph) {
return &(buf_vec.at(mem_zone_id));
};
std::vector<int64_t> cpu_device_offset(Global<ResourceDesc, ForSession>::Get()->TotalMachineNum(),
0);
auto AllocateCpuThrdIdEvenly = [&](const TaskNode* task_node) {
CHECK(!task_node->IsIndependent());
int64_t& offset = cpu_device_offset.at(task_node->machine_id());
int64_t ret = Global<IDMgr>::Get()->GetCpuDeviceThrdId(offset);
offset = (offset + 1) % Global<ResourceDesc, ForSession>::Get()->CpuDeviceNum();
return ret;
};
std::vector<std::pair<int64_t, CompTaskNode*>> machine_persistence_task_vec;
logical_gph_->ForEachNode([&](const LogicalNode* logical_node) {
logical_node->GenSortedCompTaskNodes(
AllocateCpuThrdIdEvenly, &machine_persistence_task_vec, [&](CompTaskNode* comp_task_node) {
AddAllocatedNode(comp_task_node);
logical2sorted_comp_tasks[logical_node].push_back(comp_task_node);
comp_task_node->set_area_id(logical_node->GetAreaId());
});
logical_node->GenSortedCompTaskNodes([&](CompTaskNode* comp_task_node) {
AddAllocatedNode(comp_task_node);
logical2sorted_comp_tasks[logical_node].push_back(comp_task_node);
comp_task_node->set_area_id(logical_node->GetAreaId());
});
});
GenerateIndependentThrdId(machine_persistence_task_vec);
logical_gph_->ForEachEdge([&](const LogicalEdge* logical_edge) {
BldSubTskGphMthd method =
GetMthdForBldSubTskGph(logical_edge->src_node(), logical_edge->dst_node());
(this->*method)(logical_edge->src_node(), logical_edge->dst_node(),
logical2sorted_comp_tasks.at(logical_edge->src_node()),
logical2sorted_comp_tasks.at(logical_edge->dst_node()), MutBufTask,
AllocateCpuThrdIdEvenly);
logical2sorted_comp_tasks.at(logical_edge->dst_node()), MutBufTask);
SetAreaIdForNewNodes(logical_edge->src_node(), logical_edge->dst_node());
});
logical_gph_->ForEachNecessaryCtrlEdge(
......@@ -329,20 +314,6 @@ void TaskGraph::ConnectCtrlEdges(const std::vector<CompTaskNode*>& src_task_node
}
}
void TaskGraph::GenerateIndependentThrdId(
const std::vector<std::pair<int64_t, CompTaskNode*>>& persistence_nodes) {
std::vector<std::pair<int64_t, TaskType>> machine_task_type_vec;
for (auto pair : persistence_nodes) {
machine_task_type_vec.emplace_back(std::make_pair(pair.first, pair.second->GetTaskType()));
}
ThrdIdGenerator generator(machine_task_type_vec, Global<IDMgr>::Get()->BaseIndependentThrdId());
for (const auto& pair : persistence_nodes) {
int64_t thrd_id = generator.GenerateThrdId(pair.first, pair.second->GetTaskType());
pair.second->set_thrd_id(thrd_id);
}
}
void TaskGraph::AcyclicTopoForEachNode(std::function<bool(TaskNode* node)> IsAllowedStartNode,
const std::function<void(TaskNode* node)>& Handler) const {
auto ForEachInNode = [&](TaskNode* node, const std::function<void(TaskNode*)>& Handler) {
......
......@@ -88,9 +88,6 @@ class TaskGraph final : public Graph<TaskNode, TaskEdge> {
void MergeChain();
void BuildCtrlRegstDescInSameChain();
void GenerateIndependentThrdId(
const std::vector<std::pair<int64_t, CompTaskNode*>>& persistence_nodes);
// inplace
void GetInplaceOpBlobArgList(
InplaceObasInfo* obas_info, const HashSet<TaskNode*>& dev_nodes,
......
......@@ -13,36 +13,33 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_JOB_THRD_ID_GENERATOR_H_
#define ONEFLOW_CORE_JOB_THRD_ID_GENERATOR_H_
#ifndef ONEFLOW_CORE_GRAPH_TASK_ID_GENERATOR_H_
#define ONEFLOW_CORE_GRAPH_TASK_ID_GENERATOR_H_
#include "oneflow/core/job/task.pb.h"
#include "oneflow/core/job/job_desc.h"
#include "oneflow/core/job/resource_desc.h"
#include "oneflow/core/common/util.h"
#include "oneflow/core/common/id_util.h"
namespace oneflow {
class ThrdIdGenerator final {
class TaskIdGenerator final {
public:
OF_DISALLOW_COPY_AND_MOVE(ThrdIdGenerator);
using task_index_t = TaskId::task_index_t;
ThrdIdGenerator(std::vector<std::pair<int64_t, TaskType>>& machine_task_type_vec,
int64_t base_thrd_id);
TaskIdGenerator() = default;
OF_DISALLOW_COPY_AND_MOVE(TaskIdGenerator);
~TaskIdGenerator() = default;
int64_t GenerateThrdId(int64_t machine_id, int64_t task_type);
TaskId Generate(const StreamId& stream_id);
private:
int64_t GetModThrdId(std::pair<int64_t, int64_t> machine_task_type);
bool TaskTypeThrdNumEqMax(int64_t task_type, int32_t thrd_num);
void InitLowerboundOfTaskType(const HashMap<int64_t, std::set<TaskType>>& machine2task_types);
int64_t tick_tock_thrd_id_;
int64_t base_thrd_id_;
HashMap<std::pair<int64_t, int64_t>, int32_t> machine_task_type2thrd_num_;
HashMap<std::pair<int64_t, int64_t>, int64_t> machine_task_type2offset_;
HashMap<std::pair<int64_t, int64_t>, int32_t> machine_task_type2lowerbound_;
HashMap<StreamId, task_index_t> stream_id2task_index_counter_;
};
inline TaskId TaskIdGenerator::Generate(const StreamId& stream_id) {
task_index_t task_index = stream_id2task_index_counter_[stream_id]++;
return TaskId{stream_id, task_index};
}
} // namespace oneflow
#endif // ONEFLOW_CORE_JOB_THRD_ID_GENERATOR_H_
#endif // ONEFLOW_CORE_GRAPH_TASK_ID_GENERATOR_H_
......@@ -14,6 +14,9 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/graph/task_node.h"
#include "oneflow/core/common/id_util.h"
#include "oneflow/core/graph/id_serialization.h"
#include "oneflow/core/job/id_manager.h"
namespace oneflow {
......@@ -385,7 +388,9 @@ void TaskNode::FixRegisterNumRange() {
void TaskNode::UpdateTaskId() {
CHECK_NE(machine_id_, -1);
CHECK_NE(thrd_id_, -1);
task_id_ = Global<IDMgr>::Get()->NewTaskId(machine_id_, thrd_id_);
StreamId stream_id = DeserializeStreamIdFromInt64(thrd_id_);
TaskId task_id = Global<IDMgr>::Get()->GetTaskIdGenerator()->Generate(stream_id);
task_id_ = SerializeTaskIdToInt64(task_id);
}
int64_t TaskNode::GlobalWorkStreamId() const {
......
......@@ -22,6 +22,17 @@ limitations under the License.
#include "oneflow/core/operator/operator.h"
#include "oneflow/core/common/auto_registration_factory.h"
namespace std {
template<>
struct hash<oneflow::TaskType> {
std::size_t operator()(const oneflow::TaskType& task_type) const {
return std::hash<uint32_t>{}(static_cast<uint32_t>(task_type));
}
};
} // namespace std
namespace oneflow {
RegstDescProto* FindOrCreateProducedCtrlRegstDesc(TaskProto* task_proto,
......
......@@ -22,6 +22,12 @@ limitations under the License.
#include "oneflow/core/graph/task_graph.h"
#include "oneflow/core/graph/op_graph.h"
#include "oneflow/core/framework/framework.h"
#include "oneflow/core/common/id_util.h"
#include "oneflow/core/graph/id_serialization.h"
#include "oneflow/core/device/cpu_stream_index.h"
#ifdef WITH_CUDA
#include "oneflow/core/device/cuda_stream_index.h"
#endif
namespace oneflow {
......@@ -112,10 +118,7 @@ std::string LogicalNode::VisualStr() const {
return ss.str();
}
void LogicalNode::GenSortedCompTaskNodes(
std::function<int64_t(const TaskNode*)> AllocateCpuThrdIdEvenly,
std::vector<std::pair<int64_t, CompTaskNode*>>* nodes,
std::function<void(CompTaskNode*)> Handler) const {
void LogicalNode::GenSortedCompTaskNodes(std::function<void(CompTaskNode*)> Handler) const {
int64_t parallel_idx = 0;
int64_t parallel_num = parallel_desc_->parallel_num();
for (int64_t machine_id : parallel_desc_->sorted_machine_ids()) {
......@@ -125,45 +128,64 @@ void LogicalNode::GenSortedCompTaskNodes(
comp_task_node->mut_parallel_ctx()->set_parallel_id(parallel_idx++);
comp_task_node->mut_parallel_ctx()->set_parallel_num(parallel_num);
const IDMgr* id_mgr = Global<IDMgr>::Get();
if (parallel_desc_->device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
DeviceId device_id{static_cast<DeviceId::rank_t>(machine_id), DeviceType::kGPU,
static_cast<DeviceId::device_index_t>(dev_phy_id)};
StreamId::stream_index_t stream_index = 0;
auto* cuda_stream_index_generator = dynamic_cast<CudaStreamIndexGenerator*>(
Global<IDMgr>::Get()->GetStreamIndexGeneratorManager()->GetGenerator(device_id));
CHECK_NOTNULL(cuda_stream_index_generator);
switch (comp_task_node->GetCudaWorkType()) {
case CudaWorkType::kCompute: {
comp_task_node->set_thrd_id(id_mgr->GetGpuComputeThrdId(dev_phy_id));
stream_index = cuda_stream_index_generator->GenerateComputeStreamIndex();
break;
}
case CudaWorkType::kCopyH2D: {
comp_task_node->set_thrd_id(id_mgr->GetGpuH2DThrdId(dev_phy_id));
stream_index = cuda_stream_index_generator->GenerateH2DStreamIndex();
break;
}
case CudaWorkType::kCopyD2H: {
comp_task_node->set_thrd_id(id_mgr->GetGpuD2HThrdId(dev_phy_id));
stream_index = cuda_stream_index_generator->GenerateD2HStreamIndex();
break;
}
case CudaWorkType::kNccl: {
comp_task_node->set_thrd_id(id_mgr->GetGpuNcclThrdId(dev_phy_id));
stream_index = cuda_stream_index_generator->GenerateNcclStreamIndex();
break;
}
case CudaWorkType::kMix: {
comp_task_node->set_thrd_id(id_mgr->GetGpuMixThrdId(dev_phy_id));
stream_index = cuda_stream_index_generator->GenerateMixStreamIndex();
break;
}
case CudaWorkType::kDecodeH2D: {
comp_task_node->set_thrd_id(id_mgr->GetGpuDecodeH2DThrdId(dev_phy_id));
stream_index = cuda_stream_index_generator->GenerateDecodeH2DStreamIndex();
break;
}
default: UNIMPLEMENTED();
default: { UNIMPLEMENTED(); }
}
comp_task_node->set_thrd_id(SerializeStreamIdToInt64(StreamId{device_id, stream_index}));
#else
UNIMPLEMENTED();
#endif
} else if (parallel_desc_->device_type() == DeviceType::kCPU) {
DeviceId device_id{static_cast<DeviceId::rank_t>(machine_id), DeviceType::kCPU,
DeviceId::kCPUDeviceIndex};
auto* cpu_stream_index_generator = dynamic_cast<CPUStreamIndexGenerator*>(
Global<IDMgr>::Get()->GetStreamIndexGeneratorManager()->GetGenerator(device_id));
CHECK_NOTNULL(cpu_stream_index_generator);
StreamId::stream_index_t stream_index = 0;
if (comp_task_node->IsIndependent()) {
nodes->push_back({machine_id, comp_task_node});
TaskType task_type = comp_task_node->GetTaskType();
if (IsClassRegistered<int32_t, TickTockTaskType>(task_type)) {
stream_index = cpu_stream_index_generator->GenerateTickTockStreamIndex();
} else {
stream_index =
cpu_stream_index_generator->GenerateIndependentTaskStreamIndex(task_type);
}
} else {
comp_task_node->set_thrd_id(AllocateCpuThrdIdEvenly(comp_task_node));
stream_index = cpu_stream_index_generator->GenerateComputeStreamIndex();
}
comp_task_node->set_thrd_id(SerializeStreamIdToInt64(StreamId{device_id, stream_index}));
} else {
UNIMPLEMENTED();
}
......
......@@ -15,71 +15,37 @@ limitations under the License.
*/
#include "oneflow/core/job/id_manager.h"
#include "oneflow/core/device/cuda_util.h"
#include "oneflow/core/common/id_util.h"
#include "oneflow/core/graph/id_serialization.h"
namespace oneflow {
int64_t IDMgr::GetGpuH2DThrdId(int64_t dev_phy_id) const { return gpu_device_num_ + dev_phy_id; }
int64_t IDMgr::GetGpuD2HThrdId(int64_t dev_phy_id) const {
return gpu_device_num_ * 2 + dev_phy_id;
}
int64_t IDMgr::GetGpuNcclThrdId(int64_t dev_phy_id) const {
return gpu_device_num_ * 3 + dev_phy_id;
}
int64_t IDMgr::GetGpuMixThrdId(int64_t dev_phy_id) const {
return gpu_device_num_ * 4 + dev_phy_id;
}
int64_t IDMgr::GetGpuDecodeH2DThrdId(int64_t dev_phy_id) const {
return gpu_device_num_ * 5 + dev_phy_id;
}
int64_t IDMgr::GetCpuDeviceThrdId(int64_t dev_phy_id) const {
return gpu_device_num_ * GetCudaWorkTypeSize() + dev_phy_id;
}
int64_t IDMgr::CommNetThrdId() const {
return gpu_device_num_ * GetCudaWorkTypeSize() + cpu_device_num_;
}
int64_t IDMgr::TickTockThrdId() const { return CommNetThrdId() + 1; }
int64_t IDMgr::BaseIndependentThrdId() const { return base_independent_thrd_id_; }
void IDMgr::UpdateBaseIndependentThrdId(int64_t val) {
if (val >= base_independent_thrd_id_) { base_independent_thrd_id_ = val + 1; }
}
int64_t IDMgr::NewTaskId(int64_t machine_id, int64_t thrd_id) {
int64_t machine_thrd_id = GetMachineThrdId(machine_id, thrd_id);
CHECK_LT(machine_thrd_id2num_of_tasks_[machine_thrd_id],
(static_cast<int64_t>(1) << task_id_bit_num_) - 1);
return machine_thrd_id | (machine_thrd_id2num_of_tasks_[machine_thrd_id]++);
}
DeviceType IDMgr::GetDeviceTypeFromThrdId(int64_t thrd_id) const {
if (thrd_id < GetCudaWorkTypeSize() * gpu_device_num_) {
return DeviceType::kGPU;
} else {
return DeviceType::kCPU;
}
return DeserializeStreamIdFromInt64(thrd_id).device_id().device_type();
}
int64_t IDMgr::GetGpuPhyIdFromThrdId(int64_t thrd_id) const {
CHECK_LT(thrd_id, GetCudaWorkTypeSize() * gpu_device_num_);
return thrd_id % gpu_device_num_;
StreamId stream_id = DeserializeStreamIdFromInt64(thrd_id);
DeviceId device_id = stream_id.device_id();
CHECK_EQ(device_id.device_type(), DeviceType::kGPU);
return device_id.device_index();
}
DeviceType IDMgr::GetDeviceTypeFromActorId(int64_t actor_id) const {
int64_t thrd_id = ThrdId4ActorId(actor_id);
return GetDeviceTypeFromThrdId(thrd_id);
return DeserializeTaskIdFromInt64(actor_id).stream_id().device_id().device_type();
}
int64_t IDMgr::MachineId4ActorId(int64_t actor_id) const {
return actor_id >> (63 - machine_id_bit_num_);
// TODO: change this inferface semantics, rank does not indicate machine_id in multi-client
return DeserializeTaskIdFromInt64(actor_id).stream_id().device_id().rank();
}
int64_t IDMgr::ThrdId4ActorId(int64_t actor_id) const {
int64_t tmp = (actor_id << machine_id_bit_num_);
tmp &= ~(static_cast<int64_t>(1) << 63);
return tmp >> (63 - thread_id_bit_num_);
return SerializeStreamIdToInt64(DeserializeTaskIdFromInt64(actor_id).stream_id());
}
int64_t IDMgr::GlobalWorkStreamId4TaskId(int64_t task_id) const {
return (task_id >> task_id_bit_num_) << task_id_bit_num_;
return SerializeStreamIdToInt64(DeserializeTaskIdFromInt64(task_id).stream_id());
}
int64_t IDMgr::GlobalWorkStreamId4ActorId(int64_t actor_id) const {
......@@ -87,12 +53,15 @@ int64_t IDMgr::GlobalWorkStreamId4ActorId(int64_t actor_id) const {
}
int64_t IDMgr::GlobalThrdId4TaskId(int64_t task_id) const {
int shift = local_work_stream_id_bit_num_ + task_id_bit_num_;
return (task_id >> shift) << shift;
return SerializeStreamIdToInt64(DeserializeTaskIdFromInt64(task_id).stream_id());
}
int64_t IDMgr::PickCpuThrdIdEvenly(int64_t machine_id) {
return GetCpuDeviceThrdId(machine_id2num_cpu_thrd_id_picked_[machine_id]++ % cpu_device_num_);
DeviceId device_id{static_cast<DeviceId::rank_t>(machine_id), DeviceType::kCPU,
DeviceId::kCPUDeviceIndex};
auto* stream_index_generator = GetStreamIndexGeneratorManager()->GetGenerator(device_id);
StreamId stream_id{device_id, stream_index_generator->GenerateComputeStreamIndex()};
return SerializeStreamIdToInt64(stream_id);
}
IDMgr::IDMgr() {
......@@ -104,14 +73,6 @@ IDMgr::IDMgr() {
regst_desc_id_count_ = 0;
mem_block_id_count_ = 0;
chunk_id_count_ = 0;
base_independent_thrd_id_ = TickTockThrdId() + 1;
}
int64_t IDMgr::GetMachineThrdId(int64_t machine_id, int64_t thrd_id) {
int64_t machine_id64bit = machine_id << (63 - machine_id_bit_num_);
int64_t thread_id64bit = thrd_id << (local_work_stream_id_bit_num_ + task_id_bit_num_);
int64_t machine_thread_id = machine_id64bit | thread_id64bit;
return machine_thread_id;
}
} // namespace oneflow
......@@ -20,6 +20,8 @@ limitations under the License.
#include "oneflow/core/job/job_desc.h"
#include "oneflow/core/job/resource_desc.h"
#include "oneflow/core/job/global_for.h"
#include "oneflow/core/device/stream_index.h"
#include "oneflow/core/graph/task_id_generator.h"
namespace oneflow {
......@@ -28,26 +30,12 @@ class IDMgr final {
OF_DISALLOW_COPY_AND_MOVE(IDMgr);
~IDMgr() = default;
// Get ThrdId, TaskId, RegstDescId
int64_t GetGpuComputeThrdId(int64_t dev_phy_id) const { return dev_phy_id; }
int64_t GetGpuH2DThrdId(int64_t dev_phy_id) const;
int64_t GetGpuD2HThrdId(int64_t dev_phy_id) const;
int64_t GetGpuNcclThrdId(int64_t dev_phy_id) const;
int64_t GetGpuMixThrdId(int64_t dev_phy_id) const;
int64_t GetGpuDecodeH2DThrdId(int64_t dev_phy_id) const;
int64_t GetCpuDeviceThrdId(int64_t dev_phy_id) const;
int64_t CommNetThrdId() const;
int64_t TickTockThrdId() const;
int64_t BaseIndependentThrdId() const;
void UpdateBaseIndependentThrdId(int64_t val);
int64_t NewTaskId(int64_t machine_id, int64_t thrd_id);
int64_t NewRegstDescId() { return regst_desc_id_count_++; }
int64_t NewMemBlockId() { return mem_block_id_count_++; }
int64_t NewChunkId() { return chunk_id_count_++; }
// MemZoneId
int64_t CpuMemZoneId() const { return Global<ResourceDesc, ForSession>::Get()->GpuDeviceNum(); }
int64_t CpuMemZoneId() const { return gpu_device_num_; }
bool IsCpuMemZone(int64_t mem_zone_id) const { return mem_zone_id == CpuMemZoneId(); }
bool IsGpuMemZone(int64_t mem_zone_id) const { return mem_zone_id < gpu_device_num_; }
int64_t GpuMemZoneId(int64_t dev_phy_id) const { return dev_phy_id; }
......@@ -76,20 +64,20 @@ class IDMgr final {
int64_t GlobalWorkStreamId4TaskId(int64_t task_id) const;
int64_t PickCpuThrdIdEvenly(int64_t machine_id);
StreamIndexGeneratorManager* GetStreamIndexGeneratorManager() { return &stream_index_gen_mgr_; }
TaskIdGenerator* GetTaskIdGenerator() { return &task_id_gen_; }
private:
friend class Global<IDMgr>;
IDMgr();
int64_t GetMachineThrdId(int64_t machine_id, int64_t thrd_id);
int64_t gpu_device_num_;
int64_t cpu_device_num_;
int64_t regst_desc_id_count_;
int64_t mem_block_id_count_;
int64_t chunk_id_count_;
HashMap<int64_t, int64_t> machine_thrd_id2num_of_tasks_;
HashMap<int64_t, int64_t> machine_thrd_id2stream_id_cnt_;
int64_t base_independent_thrd_id_;
HashMap<int64_t, int64_t> machine_id2num_cpu_thrd_id_picked_;
StreamIndexGeneratorManager stream_index_gen_mgr_;
TaskIdGenerator task_id_gen_;
// 64 bit id design:
// sign | machine | thread | local_work_stream | task
......
......@@ -58,21 +58,6 @@ void Delete() {
} // namespace
TEST(IDMgr, compile_task_id) {
New();
int64_t machine1thrd2 =
(static_cast<int64_t>(1) << machine_id_shl) + (static_cast<int64_t>(2) << thread_id_shl);
int64_t machine3thrd4 =
(static_cast<int64_t>(3) << machine_id_shl) + (static_cast<int64_t>(4) << thread_id_shl);
ASSERT_EQ(Global<IDMgr>::Get()->NewTaskId(1, 2), machine1thrd2 | 0);
ASSERT_EQ(Global<IDMgr>::Get()->NewTaskId(1, 2), machine1thrd2 | 1);
ASSERT_EQ(Global<IDMgr>::Get()->NewTaskId(1, 2), machine1thrd2 | 2);
ASSERT_EQ(Global<IDMgr>::Get()->NewTaskId(3, 4), machine3thrd4 | 0);
ASSERT_EQ(Global<IDMgr>::Get()->NewTaskId(3, 4), machine3thrd4 | 1);
ASSERT_EQ(Global<IDMgr>::Get()->NewTaskId(3, 4), machine3thrd4 | 2);
Delete();
}
TEST(IDMgr, compile_regst_desc_id) {
New();
ASSERT_EQ(Global<IDMgr>::Get()->NewRegstDescId(), 0);
......@@ -81,31 +66,4 @@ TEST(IDMgr, compile_regst_desc_id) {
Delete();
}
TEST(IDMgr, runtime_machine_id) {
New();
int64_t actor_id5_machine1thrd3 =
(static_cast<int64_t>(1) << machine_id_shl) // machine_id_1
+ (static_cast<int64_t>(3) << thread_id_shl) // thrd_id_3
+ (static_cast<int64_t>(1) << local_work_stream_shl) // work_stream_id_1
+ 5; // actor_id_5
ASSERT_EQ(Global<IDMgr>::Get()->MachineId4ActorId(actor_id5_machine1thrd3), 1);
Delete();
}
TEST(IDMgr, runtime_thrd_id) {
New();
int64_t actor_id5_machine1thrd3 = (static_cast<int64_t>(1) << machine_id_shl) // machine_id_1
+ (static_cast<int64_t>(3) << thread_id_shl) // thrd_id_3
// work_stream_id_0
+ 5; // actor_id_5
ASSERT_EQ(Global<IDMgr>::Get()->ThrdId4ActorId(actor_id5_machine1thrd3), 3);
int64_t actor_id6_machine2thrd4 =
(static_cast<int64_t>(2) << machine_id_shl) // machine_id_2
+ (static_cast<int64_t>(4) << thread_id_shl) // thrd_id_4
+ (static_cast<int64_t>(101) << local_work_stream_shl) // work_stream_id_101
+ 6; // actor_id_6
ASSERT_EQ(Global<IDMgr>::Get()->ThrdId4ActorId(actor_id6_machine2thrd4), 4);
Delete();
}
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/job/thrd_id_generator.h"
#include "oneflow/core/graph/task_node.h"
namespace oneflow {
ThrdIdGenerator::ThrdIdGenerator(std::vector<std::pair<int64_t, TaskType>>& machine_task_type_vec,
int64_t base_thrd_id)
: base_thrd_id_(base_thrd_id) {
HashMap<int64_t, std::set<TaskType>> machine2task_types;
// machine_task_type = <machine_id, task_type>
for (const auto& machine_task_type : machine_task_type_vec) {
if (IsClassRegistered<int32_t, TickTockTaskType>(machine_task_type.second)) { continue; }
if (TaskTypeThrdNumEqMax(machine_task_type.second,
machine_task_type2thrd_num_[machine_task_type])) {
continue;
}
machine_task_type2thrd_num_[machine_task_type]++;
machine2task_types[machine_task_type.first].emplace(machine_task_type.second);
}
InitLowerboundOfTaskType(machine2task_types);
}
int64_t ThrdIdGenerator::GenerateThrdId(int64_t machine_id, int64_t task_type) {
if (IsClassRegistered<int32_t, TickTockTaskType>(task_type)) {
return Global<IDMgr>::Get()->TickTockThrdId();
}
auto key = std::make_pair(machine_id, task_type);
int64_t ret = machine_task_type2lowerbound_[key] + GetModThrdId(key);
CHECK_GE(ret, 0);
Global<IDMgr>::Get()->UpdateBaseIndependentThrdId(ret);
return ret;
}
int64_t ThrdIdGenerator::GetModThrdId(std::pair<int64_t, int64_t> machine_task_type) {
int64_t& offset = machine_task_type2offset_[machine_task_type];
int64_t mod_thrd_id = offset % machine_task_type2thrd_num_[machine_task_type];
offset++;
return mod_thrd_id;
}
bool ThrdIdGenerator::TaskTypeThrdNumEqMax(int64_t task_type, int32_t thrd_num) {
if (IsClassRegistered<int32_t, IndependentThreadNum4TaskType>(task_type)) {
std::unique_ptr<IndependentThreadNum4TaskType> thread_num;
thread_num.reset(NewObj<int32_t, IndependentThreadNum4TaskType>(task_type));
return (thrd_num == *thread_num);
} else {
return false;
}
}
void ThrdIdGenerator::InitLowerboundOfTaskType(
const HashMap<int64_t, std::set<TaskType>>& machine2task_types) {
for (const auto& pair : machine2task_types) {
int64_t machine_id = pair.first;
auto& task_types = pair.second;
int64_t lowerbound = base_thrd_id_;
for (int64_t task_type : task_types) {
auto machine_task_type = std::make_pair(machine_id, task_type);
machine_task_type2lowerbound_[machine_task_type] = lowerbound;
lowerbound += machine_task_type2thrd_num_[machine_task_type];
}
}
}
} // namespace oneflow
......@@ -22,48 +22,62 @@ limitations under the License.
#include "oneflow/core/common/blocking_counter.h"
#include "oneflow/core/control/global_process_ctx.h"
#include "oneflow/core/job/global_for.h"
#include "oneflow/core/common/id_util.h"
#include "oneflow/core/graph/id_serialization.h"
namespace oneflow {
ThreadMgr::~ThreadMgr() {
for (size_t i = 0; i < threads_.size(); ++i) {
ActorMsg msg = ActorMsg::BuildCommandMsg(-1, ActorCmd::kStopThread);
threads_[i]->GetMsgChannelPtr()->Send(msg);
delete threads_[i];
LOG(INFO) << "actor thread " << i << " finish";
}
}
Thread* ThreadMgr::GetThrd(int64_t thrd_id) { return threads_.at(thrd_id); }
ThreadMgr::ThreadMgr(const Plan& plan) {
int64_t thrd_id = 0;
namespace {
Thread* NewThread(StreamId stream_id) {
int64_t thrd_id = SerializeStreamIdToInt64(stream_id);
DeviceId device_id = stream_id.device_id();
Thread* thread = nullptr;
switch (device_id.device_type()) {
#ifdef WITH_CUDA
FOR_RANGE(int64_t, i, 0, GetCudaWorkTypeSize()) {
FOR_RANGE(int64_t, dev_phy_id, 0, (Global<ResourceDesc, ForSession>::Get()->GpuDeviceNum())) {
threads_.push_back(new GpuThread(thrd_id++, dev_phy_id));
case DeviceType::kGPU: {
thread = new GpuThread(thrd_id, static_cast<int64_t>(device_id.device_index()));
break;
}
}
#endif
FOR_RANGE(int64_t, i, 0, (Global<ResourceDesc, ForSession>::Get()->CpuDeviceNum())) {
threads_.push_back(new CpuThread(thrd_id++));
case DeviceType::kCPU: {
thread = new CpuThread(thrd_id);
break;
}
default: { UNIMPLEMENTED(); }
}
threads_.push_back(new CpuThread(thrd_id++)); // comm_net
CreatePersistenceThrd(plan, thrd_id);
return thread;
}
void ThreadMgr::CreatePersistenceThrd(const Plan& plan, int64_t thrd_id) {
const int64_t this_machine_id = GlobalProcessCtx::Rank();
} // namespace
int64_t max_thrd_id = 0;
for (const TaskProto& task : plan.task()) {
if (task.machine_id() == this_machine_id) {
if (max_thrd_id < task.thrd_id()) { max_thrd_id = task.thrd_id(); }
}
ThreadMgr::~ThreadMgr() {
for (auto& thread_pair : threads_) {
ActorMsg msg = ActorMsg::BuildCommandMsg(-1, ActorCmd::kStopThread);
thread_pair.second->GetMsgChannelPtr()->Send(msg);
thread_pair.second.reset();
LOG(INFO) << "actor thread " << thread_pair.first << " finish";
}
}
for (int64_t i = thrd_id; i <= max_thrd_id; i++) { threads_.push_back(new CpuThread(i)); }
Thread* ThreadMgr::GetThrd(int64_t thrd_id) {
auto iter = threads_.find(thrd_id);
CHECK(iter != threads_.end()) << "thread " << thrd_id << " not found";
return iter->second.get();
}
ThreadMgr::ThreadMgr(const Plan& plan) {
const int64_t this_rank = GlobalProcessCtx::Rank();
for (const TaskProto& task : plan.task()) {
TaskId task_id = DeserializeTaskIdFromInt64(task.task_id());
StreamId stream_id = task_id.stream_id();
if (stream_id.device_id().rank() != this_rank) { continue; }
int64_t thrd_id = SerializeStreamIdToInt64(stream_id);
if (threads_.find(thrd_id) != threads_.end()) { continue; }
Thread* thread = NewThread(stream_id);
CHECK_NOTNULL(thread);
threads_[thrd_id].reset(thread);
}
}
void SingleThreadLoop(size_t num, std::function<void(size_t i)> Callback) {
......
......@@ -37,9 +37,7 @@ class ThreadMgr final {
friend class Global<ThreadMgr>;
explicit ThreadMgr(const Plan& plan);
void CreatePersistenceThrd(const Plan& plan, int64_t thrd_id);
std::vector<Thread*> threads_;
HashMap<int64_t, std::unique_ptr<Thread>> threads_;
};
void SingleThreadLoop(size_t num, std::function<void(size_t i)> Callback);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册