未验证 提交 533018ae 编写于 作者: L Li Xinqi 提交者: GitHub

Local dep object pool (#5953)

* GetLocalDepObjectPool

* fix compiler complaints
Co-authored-by: NHoujiang Chen <chenhoujiangcug@gmail.com>
Co-authored-by: Noneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
上级 e7e39aa1
......@@ -15,7 +15,6 @@ limitations under the License.
*/
#include "oneflow/core/eager/eager_blob_object.h"
#include "oneflow/core/vm/allocator.h"
#include "oneflow/core/job/parallel_desc.h"
#include "oneflow/core/framework/to_string.h"
#include "oneflow/core/framework/shut_down_util.h"
#include "oneflow/core/common/shape_vec.h"
......@@ -23,24 +22,15 @@ limitations under the License.
namespace oneflow {
namespace vm {
namespace {
Maybe<VmLocalDepObject> GetVmLocalDepObject(
const std::shared_ptr<const ParallelDesc>& parallel_desc) {
return parallel_desc != nullptr
? Maybe<VmLocalDepObject>(std::make_shared<VmLocalDepObject>(parallel_desc))
: Error::Unimplemented();
}
} // namespace
EagerBlobObject::EagerBlobObject(const std::shared_ptr<MemoryCase>& mem_case,
const std::shared_ptr<Shape>& shape, DataType data_type,
const std::shared_ptr<TensorBuffer>& tensor_buffer,
const std::shared_ptr<const ParallelDesc>& parallel_desc)
const Optional<LocalDepObject*>& dep_object)
: BlobObject(mem_case, shape, data_type),
tensor_buffer_(tensor_buffer),
blob_body_bytes_(0),
is_shape_synced_(true),
compute_local_dep_object_(GetVmLocalDepObject(parallel_desc)) {
compute_local_dep_object_(dep_object) {
CHECK(static_cast<bool>(shape));
CHECK(static_cast<bool>(tensor_buffer));
non_pod_initer_ = std::make_unique<MemoryAllocator>();
......
......@@ -17,8 +17,9 @@ limitations under the License.
#define ONEFLOW_CORE_EAGER_EAGER_BLOB_OBJECT_H_
#include "oneflow/core/common/maybe.h"
#include "oneflow/core/common/optional.h"
#include "oneflow/core/eager/blob_object.h"
#include "oneflow/core/framework/vm_local_dep_object.h"
#include "oneflow/core/framework/local_dep_object.h"
#include "oneflow/core/memory/memory_allocator.h"
namespace oneflow {
......@@ -44,10 +45,14 @@ class EagerBlobObject final : public BlobObject {
EagerBlobObject(EagerBlobObject&&) = delete;
EagerBlobObject(const std::shared_ptr<MemoryCase>& mem_case, const std::shared_ptr<Shape>& shape,
DataType data_type, const std::shared_ptr<TensorBuffer>& tensor_buffer)
: EagerBlobObject(mem_case, shape, data_type, tensor_buffer, nullptr) {}
: EagerBlobObject(mem_case, shape, data_type, tensor_buffer, Optional<LocalDepObject*>()) {}
EagerBlobObject(const std::shared_ptr<MemoryCase>& mem_case, const std::shared_ptr<Shape>& shape,
DataType data_type, const std::shared_ptr<TensorBuffer>& tensor_buffer,
const std::shared_ptr<const ParallelDesc>& parallel_desc);
LocalDepObject* dep_object)
: EagerBlobObject(mem_case, shape, data_type, tensor_buffer,
Optional<LocalDepObject*>(dep_object)) {}
~EagerBlobObject() override {
non_pod_initer_.reset();
tensor_buffer_.reset();
......@@ -69,7 +74,9 @@ class EagerBlobObject final : public BlobObject {
return Maybe<void>::Ok();
}
Maybe<VmLocalDepObject> compute_local_dep_object() const { return compute_local_dep_object_; }
Maybe<LocalDepObject*> compute_local_dep_object() const {
return compute_local_dep_object_.value();
}
std::shared_ptr<TensorBuffer>& tensor_buffer() { return tensor_buffer_; }
......@@ -78,13 +85,17 @@ class EagerBlobObject final : public BlobObject {
void set_is_shape_synced(bool val) { is_shape_synced_ = val; }
private:
EagerBlobObject(const std::shared_ptr<MemoryCase>& mem_case, const std::shared_ptr<Shape>& shape,
DataType data_type, const std::shared_ptr<TensorBuffer>& tensor_buffer,
const Optional<LocalDepObject*>& dep_object);
std::unique_ptr<Blob> blob_;
std::unique_ptr<char, std::function<void(char*)>> header_buffer_;
std::shared_ptr<TensorBuffer> tensor_buffer_;
std::size_t blob_body_bytes_;
std::unique_ptr<MemoryAllocator> non_pod_initer_;
std::atomic<bool> is_shape_synced_;
Maybe<VmLocalDepObject> compute_local_dep_object_;
Optional<LocalDepObject*> compute_local_dep_object_;
};
} // namespace vm
......
......@@ -26,9 +26,7 @@ void LocalCallOpKernelPhyInstrOperand::ForEachConstMirroredObject(
const auto& input_list = inputs();
for (int64_t index : opkernel().input_tuple_indexes4const_ibns()) {
const auto& input = input_list->at(index);
DoEach(nullptr, CHECK_JUST(input->compute_local_dep_object())
->mut_local_dep_object()
->mut_mirrored_object());
DoEach(nullptr, CHECK_JUST(input->compute_local_dep_object())->mut_mirrored_object());
}
}
......@@ -38,27 +36,23 @@ void LocalCallOpKernelPhyInstrOperand::ForEachMutMirroredObject(
auto* device_dep_object = opkernel().device()->mut_compute_local_dep_object();
if (opkernel().device()->type() == "nccl") {
// Sequantialize nccl instructions to avoid deadlock
DoEach(nullptr, device_dep_object->mut_local_dep_object()->mut_mirrored_object());
DoEach(nullptr, device_dep_object->mut_mirrored_object());
} else {
// Sequantialize instructions to avoid explosive memory allocation of source ops
if (dev_vm_dep_object_consume_mode() == one::DevVmDepObjectConsumeMode::MUTABLE) {
DoEach(nullptr, device_dep_object->mut_local_dep_object()->mut_mirrored_object());
DoEach(nullptr, device_dep_object->mut_mirrored_object());
}
}
const auto& input_list = inputs();
for (int64_t index : opkernel().input_tuple_indexes4mut_ibns()) {
const auto& input = input_list->at(index);
DoEach(nullptr, CHECK_JUST(input->compute_local_dep_object())
->mut_local_dep_object()
->mut_mirrored_object());
DoEach(nullptr, CHECK_JUST(input->compute_local_dep_object())->mut_mirrored_object());
}
const auto& output_list = outputs();
for (int64_t index : opkernel().output_tuple_indexes4mut_obns()) {
const auto& output = output_list->at(index);
DoEach(nullptr, CHECK_JUST(output->compute_local_dep_object())
->mut_local_dep_object()
->mut_mirrored_object());
DoEach(nullptr, CHECK_JUST(output->compute_local_dep_object())->mut_mirrored_object());
}
}
......@@ -68,9 +62,7 @@ void LocalCallOpKernelPhyInstrOperand::ForEachMut2MirroredObject(
const auto& output_list = outputs();
for (int64_t index : opkernel().output_tuple_indexes4mut2_obns()) {
const auto& output = output_list->at(index);
DoEach(nullptr, CHECK_JUST(output->compute_local_dep_object())
->mut_local_dep_object()
->mut_mirrored_object());
DoEach(nullptr, CHECK_JUST(output->compute_local_dep_object())->mut_mirrored_object());
}
}
......
......@@ -22,9 +22,7 @@ void RunLazyJobPhyInstrOperand::ForEachConstMirroredObject(
const std::function<void(vm::MirroredObject* infer, vm::MirroredObject* compute)>& DoEach)
const {
for (const auto& input : *inputs()) {
DoEach(nullptr, CHECK_JUST(input->compute_local_dep_object())
->mut_local_dep_object()
->mut_mirrored_object());
DoEach(nullptr, CHECK_JUST(input->compute_local_dep_object())->mut_mirrored_object());
}
}
......@@ -32,9 +30,7 @@ void RunLazyJobPhyInstrOperand::ForEachMutMirroredObject(
const std::function<void(vm::MirroredObject* infer, vm::MirroredObject* compute)>& DoEach)
const {
for (const auto& parameter : *parameters()) {
DoEach(nullptr, CHECK_JUST(parameter->compute_local_dep_object())
->mut_local_dep_object()
->mut_mirrored_object());
DoEach(nullptr, CHECK_JUST(parameter->compute_local_dep_object())->mut_mirrored_object());
}
}
......@@ -44,9 +40,7 @@ void RunLazyJobPhyInstrOperand::ForEachMut2MirroredObject(
// TODO(lixinqi): move partial of outputs into ForEachMutMirroredObject if shape infered before
// compute.
for (const auto& output : *outputs()) {
DoEach(nullptr, CHECK_JUST(output->compute_local_dep_object())
->mut_local_dep_object()
->mut_mirrored_object());
DoEach(nullptr, CHECK_JUST(output->compute_local_dep_object())->mut_mirrored_object());
}
}
......
......@@ -15,7 +15,7 @@ limitations under the License.
*/
#include <sstream>
#include "oneflow/core/framework/device.h"
#include "oneflow/core/framework/vm_local_dep_object.h"
#include "oneflow/core/framework/local_dep_object.h"
#include "oneflow/core/control/global_process_ctx.h"
#include "oneflow/core/common/str_util.h"
#include "oneflow/core/job/resource_desc.h"
......@@ -35,18 +35,19 @@ inline size_t HashDevice(const std::string& type, int64_t device_id) {
return std::hash<std::string>()(type) ^ std::hash<int64_t>()(device_id);
}
Maybe<VmLocalDepObject> FindOrCreateComputeLocalDepObject(const Device& device) {
Maybe<LocalDepObject*> FindOrCreateComputeLocalDepObject(const Device& device) {
static std::mutex mutex;
static HashMap<Device, std::shared_ptr<VmLocalDepObject>> device2dep_object;
static HashMap<Device, ObjectMsgPtr<LocalDepObject>> device2dep_object;
{
std::unique_lock<std::mutex> lock(mutex);
const auto& iter = device2dep_object.find(device);
if (iter != device2dep_object.end()) { return iter->second; }
if (iter != device2dep_object.end()) { return iter->second.Mutable(); }
}
const auto& dep_object = std::make_shared<VmLocalDepObject>(device.parallel_desc_ptr());
auto dep_object = ObjectMsgPtr<LocalDepObject>::New();
JUST(dep_object.Mutable()->Init(device));
{
std::unique_lock<std::mutex> lock(mutex);
return device2dep_object.emplace(device, dep_object).first->second;
return device2dep_object.emplace(device, dep_object).first->second.Mutable();
}
}
......@@ -105,6 +106,16 @@ Maybe<const std::string&> GetLocalCallInstructionName(const std::string& type) {
return MapAt(type2instr_name, type);
}
Maybe<size_t> Device::instr_local_dep_object_pool_size() const {
static const size_t kDoubleBufferPoolSize = 2;
static const HashMap<std::string, size_t> type2pool_size{
{"cpu", GetInstructionHighWaterMark()}, {"cuda", GetInstructionHighWaterMark()},
{"gpu", GetInstructionHighWaterMark()}, {"cuda_h2d", kDoubleBufferPoolSize},
{"cuda_d2h", kDoubleBufferPoolSize}, {"nccl", kDoubleBufferPoolSize},
};
return MapAt(type2pool_size, type());
}
Maybe<const std::string&> Device::local_call_instruction_name() const {
return GetLocalCallInstructionName(type());
}
......
......@@ -26,7 +26,10 @@ namespace oneflow {
class ParallelDesc;
class MemoryCase;
class VmLocalDepObject;
class LocalDepObject;
inline size_t GetInstructionHighWaterMark() { return 500; }
inline size_t GetInstructionLowWaterMark() { return 200; }
class Device final {
public:
......@@ -59,7 +62,8 @@ class Device final {
static std::string Type4DeviceTag(const std::string& device_tag);
Maybe<const std::string&> local_call_instruction_name() const;
VmLocalDepObject* mut_compute_local_dep_object() const { return compute_local_dep_object_.get(); }
LocalDepObject* mut_compute_local_dep_object() const { return compute_local_dep_object_; }
Maybe<size_t> instr_local_dep_object_pool_size() const;
private:
Device(const std::string& type, int64_t device_id);
......@@ -69,7 +73,7 @@ class Device final {
const int64_t device_id_;
const size_t hash_value_;
std::shared_ptr<MemoryCase> mem_case_;
std::shared_ptr<VmLocalDepObject> compute_local_dep_object_;
LocalDepObject* compute_local_dep_object_;
};
Maybe<const std::string&> GetLocalCallInstructionName(const std::string& device_tag);
......
......@@ -36,7 +36,7 @@ limitations under the License.
#include "oneflow/core/vm/release_tensor_arg_phy_instr_operand.h"
#include "oneflow/core/vm/soft_sync_stream_phy_instr_operand.h"
#include "oneflow/core/framework/consistent_tensor_infer_cache.h"
#include "oneflow/core/framework/vm_local_dep_object.h"
#include "oneflow/core/framework/local_dep_object.h"
#include "oneflow/core/framework/tensor.h"
#include "oneflow/core/framework/device.h"
#include "oneflow/core/framework/instruction_replay.h"
......@@ -908,8 +908,7 @@ Maybe<void> InstructionsBuilder::ReleaseTensor(
const std::shared_ptr<const ParallelDesc>& parallel_desc) {
std::string instr_name = parallel_desc->device_tag() + ".ReleaseTensor";
ObjectMsgPtr<vm::InstructionMsg> instruction = ObjectMsgPtr<vm::InstructionMsg>::New(instr_name);
const std::shared_ptr<VmLocalDepObject>& compute_local_dep_object =
JUST(eager_blob_object->compute_local_dep_object());
LocalDepObject* compute_local_dep_object = JUST(eager_blob_object->compute_local_dep_object());
*instruction->mutable_phy_instr_operand() = std::make_shared<vm::ReleaseTensorArgPhyInstrOperand>(
eager_blob_object, compute_local_dep_object);
*instruction->mut_parallel_desc() = parallel_desc;
......@@ -918,7 +917,7 @@ Maybe<void> InstructionsBuilder::ReleaseTensor(
}
Maybe<void> InstructionsBuilder::SoftSyncStream(
const std::shared_ptr<VmLocalDepObject> compute_local_dep_object, const std::string& modifier,
LocalDepObject* compute_local_dep_object, const std::string& modifier,
const std::shared_ptr<const ParallelDesc>& parallel_desc) {
ObjectMsgPtr<vm::InstructionMsg> instruction =
ObjectMsgPtr<vm::InstructionMsg>::New(parallel_desc->device_tag() + ".SoftSyncStream");
......@@ -975,8 +974,7 @@ Maybe<void> InstructionsBuilder::AccessBlobByCallback(const T tensor,
std::string instr_name = parallel_desc->device_tag() + ".AccessBlobByCallback";
ObjectMsgPtr<vm::InstructionMsg> instruction = ObjectMsgPtr<vm::InstructionMsg>::New(instr_name);
const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object = JUST(tensor->eager_blob_object());
const std::shared_ptr<VmLocalDepObject>& compute_local_dep_object =
JUST(tensor->compute_local_dep_object());
LocalDepObject* compute_local_dep_object = JUST(tensor->compute_local_dep_object());
*instruction->mutable_phy_instr_operand() = std::make_shared<vm::AccessBlobArgCbPhyInstrOperand>(
eager_blob_object, compute_local_dep_object, callback, modifier);
*instruction->mut_parallel_desc() = parallel_desc;
......
......@@ -131,8 +131,7 @@ class InstructionsBuilder : public std::enable_shared_from_this<InstructionsBuil
Maybe<void> ReleaseTensor(const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object,
const std::shared_ptr<const ParallelDesc>& parallel_desc);
Maybe<void> SoftSyncStream(const std::shared_ptr<VmLocalDepObject> compute_local_dep_object,
const std::string& modifier,
Maybe<void> SoftSyncStream(LocalDepObject* compute_local_dep_object, const std::string& modifier,
const std::shared_ptr<const ParallelDesc>& parallel_desc);
template<typename T>
......
......@@ -13,8 +13,10 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/framework/vm_local_dep_object.h"
#include "oneflow/core/framework/local_dep_object.h"
#include "oneflow/core/framework/device.h"
#include "oneflow/core/common/global.h"
#include "oneflow/core/common/decorator.h"
#include "oneflow/core/vm/id_util.h"
#include "oneflow/core/vm/vm_object.msg.h"
#include "oneflow/core/vm/oneflow_vm.h"
......@@ -22,9 +24,9 @@ limitations under the License.
#include "oneflow/core/control/global_process_ctx.h"
namespace oneflow {
namespace vm {
void LocalDepObject::__Init__(const std::shared_ptr<const ParallelDesc>& parallel_desc) {
Maybe<void> LocalDepObject::Init(const Device& device) {
const auto& parallel_desc = device.parallel_desc_ptr();
vm::ObjectId object_id = vm::IdUtil::NewPhysicalValueObjectId(GlobalProcessCtx::Rank());
int64_t global_device_id = 0;
{
......@@ -39,79 +41,33 @@ void LocalDepObject::__Init__(const std::shared_ptr<const ParallelDesc>& paralle
mutable_logical_object()->__Init__(object_id,
std::const_pointer_cast<ParallelDesc>(parallel_desc));
mutable_mirrored_object()->__Init__(mutable_logical_object(), global_device_id);
return Maybe<void>::Ok();
}
namespace {
using LocalDepObjectFreeList = OBJECT_MSG_LIST(LocalDepObject, free_link);
using LocalDepObjectZombieList = OBJECT_MSG_LIST(LocalDepObject, zombie_link);
using LocalDepObjectMutexedZombieList = OBJECT_MSG_MUTEXED_LIST(LocalDepObject, zombie_link);
LocalDepObjectFreeList* ThreadLocalMutFreeList4ParallelDesc(const ParallelDesc& parallel_desc) {
thread_local static HashMap<ParallelDesc, LocalDepObjectFreeList> pd2free_list;
return &pd2free_list[parallel_desc];
}
LocalDepObjectMutexedZombieList* StaticMutLocalDepObjectMutexedZombieList() {
static LocalDepObjectMutexedZombieList zombie_list;
return &zombie_list;
}
void TryMoveFromZombieListToFreeList() {
thread_local static LocalDepObjectZombieList zombie_list;
if (zombie_list.empty()) { StaticMutLocalDepObjectMutexedZombieList()->MoveTo(&zombie_list); }
static const size_t kTryCnt = 8;
size_t try_cnt = kTryCnt;
OBJECT_MSG_LIST_FOR_EACH(&zombie_list, zombie_object) {
zombie_list.Erase(zombie_object.Mutable());
size_t ref_cnt = zombie_object->ref_cnt();
if (ref_cnt == 1 /* hold by `zombie_object` only */) {
CHECK_EQ(zombie_object->mirrored_object().rw_mutexed_object().ref_cnt(), 1);
CHECK(zombie_object->mirrored_object().rw_mutexed_object().access_list().empty());
const auto& parallel_desc = *zombie_object->logical_object().parallel_desc();
auto* thread_local_free_list = ThreadLocalMutFreeList4ParallelDesc(parallel_desc);
thread_local_free_list->EmplaceBack(std::move(zombie_object));
} else {
CHECK_GT(ref_cnt, 1);
zombie_list.EmplaceBack(std::move(zombie_object));
}
if (--try_cnt < 0) { break; }
}
}
ObjectMsgPtr<LocalDepObject> GetRecycledLocalDepObject(const ParallelDesc& parallel_desc) {
auto* thread_local_free_list = ThreadLocalMutFreeList4ParallelDesc(parallel_desc);
if (thread_local_free_list->empty()) {
TryMoveFromZombieListToFreeList();
if (thread_local_free_list->empty()) { return nullptr; }
}
ObjectMsgPtr<LocalDepObject> object = thread_local_free_list->Begin();
thread_local_free_list->Erase(object.Mutable());
CHECK_EQ(object->ref_cnt(), 1); // hold by `object` only
return object;
}
void MoveLocalDepObjectToZombieList(ObjectMsgPtr<LocalDepObject>&& local_dep_object) {
static const size_t kGroupSize = 16;
thread_local static LocalDepObjectZombieList zombie_list;
zombie_list.EmplaceBack(std::move(local_dep_object));
if (zombie_list.size() >= kGroupSize) {
StaticMutLocalDepObjectMutexedZombieList()->MoveFrom(&zombie_list);
Maybe<std::vector<ObjectMsgPtr<LocalDepObject>>> RawGetLocalDepObjectPool(Symbol<Device> device) {
const auto pool = std::make_shared<std::vector<ObjectMsgPtr<LocalDepObject>>>();
size_t pool_size = JUST(device->instr_local_dep_object_pool_size());
pool->reserve(pool_size);
for (int64_t i = 0; i < pool_size; ++i) {
auto local_dep_object = ObjectMsgPtr<LocalDepObject>::New();
JUST(local_dep_object->Init(*device));
pool->push_back(local_dep_object);
}
return pool;
}
} // namespace
} // namespace vm
VmLocalDepObject::VmLocalDepObject(const std::shared_ptr<const ParallelDesc>& parallel_desc) {
local_dep_object_ = vm::GetRecycledLocalDepObject(*parallel_desc);
if (!local_dep_object_) {
local_dep_object_ = ObjectMsgPtr<vm::LocalDepObject>::New(parallel_desc);
}
}
static constexpr auto* GetLocalDepObjectPool = DECORATE(&RawGetLocalDepObjectPool, ThreadLocal);
VmLocalDepObject::~VmLocalDepObject() {
vm::MoveLocalDepObjectToZombieList(std::move(local_dep_object_));
Maybe<LocalDepObject*> GetLocalDepObject(Symbol<Device> device) {
const auto& local_dep_object_pool = JUST(GetLocalDepObjectPool(device));
CHECK_OR_RETURN(!local_dep_object_pool->empty());
size_t pool_size = local_dep_object_pool->size();
static thread_local int64_t index = 0;
return local_dep_object_pool->at(index++ % pool_size).Mutable();
}
} // namespace oneflow
......@@ -13,28 +13,28 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_FRAMEWORK_VM_LOCAL_DEP_OBJECT_H_
#define ONEFLOW_CORE_FRAMEWORK_VM_LOCAL_DEP_OBJECT_H_
#ifndef ONEFLOW_CORE_FRAMEWORK_LOCAL_DEP_OBJECT_H_
#define ONEFLOW_CORE_FRAMEWORK_LOCAL_DEP_OBJECT_H_
#include "oneflow/core/object_msg/object_msg_core.h"
#include "oneflow/core/vm/vm_object.msg.h"
#include "oneflow/core/common/maybe.h"
#include "oneflow/core/common/symbol.h"
namespace oneflow {
class ParallelDesc;
namespace vm {
class Device;
// clang-format off
// Helps VirtualMachine building instruction edges
OBJECT_MSG_BEGIN(LocalDepObject);
// methods
OF_PUBLIC void __Init__(const std::shared_ptr<const ParallelDesc>& parallel_desc);
OF_PUBLIC Maybe<void> Init(const Device& device);
// fields
OBJECT_MSG_DEFINE_OPTIONAL(LogicalObject, logical_object);
OBJECT_MSG_DEFINE_OPTIONAL(MirroredObject, mirrored_object);
OBJECT_MSG_DEFINE_OPTIONAL(vm::LogicalObject, logical_object);
OBJECT_MSG_DEFINE_OPTIONAL(vm::MirroredObject, mirrored_object);
// links
OBJECT_MSG_DEFINE_LIST_LINK(free_link);
......@@ -42,19 +42,8 @@ OBJECT_MSG_BEGIN(LocalDepObject);
OBJECT_MSG_END(LocalDepObject);
// clang-format on
} // namespace vm
class VmLocalDepObject final {
public:
explicit VmLocalDepObject(const std::shared_ptr<const ParallelDesc>& parallel_desc);
~VmLocalDepObject();
const ObjectMsgPtr<vm::LocalDepObject>& local_dep_object() const { return local_dep_object_; }
vm::LocalDepObject* mut_local_dep_object() { return local_dep_object_.Mutable(); }
Maybe<LocalDepObject*> GetLocalDepObject(Symbol<Device> device);
private:
ObjectMsgPtr<vm::LocalDepObject> local_dep_object_;
};
} // namespace oneflow
#endif // ONEFLOW_CORE_FRAMEWORK_VM_LOCAL_DEP_OBJECT_H_
#endif // ONEFLOW_CORE_FRAMEWORK_LOCAL_DEP_OBJECT_H_
......@@ -148,7 +148,8 @@ Maybe<void> NaiveInterpret(const UserOpExpr& user_op_expr, const TensorTuple& in
auto* tensor_impl = JUST(TensorImpl4Tensor(outputs->at(i)));
if (!output_eager_blob_objects->at(i)) {
tensor_impl->mut_tensor_meta()->set_stride(std::make_shared<Stride>(*tensor_impl->shape()));
JUST(tensor_impl->InitEagerBlobObject(JUST(outputs->at(i)->device())->mem_case()));
const auto& dep_object = JUST(GetLocalDepObject(op_device));
JUST(tensor_impl->InitEagerBlobObject(dep_object));
output_eager_blob_objects->at(i) = JUST(tensor_impl->eager_blob_object());
} else {
// output i is inplaced.
......
......@@ -52,17 +52,6 @@ Maybe<MirroredTensor> StaticZerosTensor::AsMirroredTensor() {
}
}
/* static */ Maybe<MirroredTensor> MirroredTensor::MakeEagerTensor(
const std::shared_ptr<vm::EagerBlobObject> eager_blob_object, const Symbol<Device>& device,
const std::shared_ptr<TensorStorage> tensor_storage, bool requires_grad, bool is_leaf) {
const auto& blob_desc = eager_blob_object->blob_desc();
const auto& tensor_meta =
std::make_shared<MirroredTensorMeta>(blob_desc.shape_ptr(), blob_desc.data_type(), device);
auto* tensor_impl = new EagerMirroredTensorImpl(tensor_meta, requires_grad, is_leaf);
JUST(tensor_impl->InitEagerBlobObjectAndTensorStorage(eager_blob_object, tensor_storage));
return std::make_shared<MirroredTensor>(std::shared_ptr<MirroredTensorImpl>(tensor_impl));
}
bool MirroredTensor::is_cuda() const { return CHECK_JUST(device())->type() == "cuda"; }
std::shared_ptr<Tensor> MirroredTensor::data() const {
......
......@@ -67,7 +67,7 @@ class Tensor {
// Getters valid only for EagerMirroredTensor
virtual Maybe<EagerMirroredTensorImpl*> mut_eager_mirrored_tensor_impl() { OF_UNIMPLEMENTED(); }
virtual Maybe<vm::EagerBlobObject> eager_blob_object() const = 0;
virtual Maybe<VmLocalDepObject> compute_local_dep_object() const = 0;
virtual Maybe<LocalDepObject*> compute_local_dep_object() const = 0;
virtual Maybe<bool> has_eager_blob_object() const = 0;
virtual Maybe<TensorStorage> tensor_storage() const { OF_UNIMPLEMENTED(); }
virtual Maybe<const Stride> stride() const { OF_UNIMPLEMENTED(); }
......@@ -146,7 +146,7 @@ class StaticZerosTensor final : public Tensor {
// Getters valid only for EagerMirroredTensor
Maybe<EagerMirroredTensorImpl*> mut_eager_mirrored_tensor_impl() { OF_UNIMPLEMENTED(); }
Maybe<vm::EagerBlobObject> eager_blob_object() const { OF_UNIMPLEMENTED(); }
Maybe<VmLocalDepObject> compute_local_dep_object() const { OF_UNIMPLEMENTED(); }
Maybe<LocalDepObject*> compute_local_dep_object() const { OF_UNIMPLEMENTED(); }
Maybe<bool> has_eager_blob_object() const { OF_UNIMPLEMENTED(); }
Maybe<TensorStorage> tensor_storage() const { OF_UNIMPLEMENTED(); }
Maybe<const Stride> stride() const { OF_UNIMPLEMENTED(); }
......@@ -278,7 +278,7 @@ class Parameter final : public TensorIf<Parameter> {
Maybe<vm::EagerBlobObject> eager_blob_object() const override {
return tensor_->eager_blob_object();
}
Maybe<VmLocalDepObject> compute_local_dep_object() const override {
Maybe<LocalDepObject*> compute_local_dep_object() const override {
return tensor_->compute_local_dep_object();
}
Maybe<bool> has_eager_blob_object() const override { return tensor_->has_eager_blob_object(); }
......@@ -371,7 +371,7 @@ class MirroredTensor final : public TensorIf<MirroredTensor>,
Maybe<vm::EagerBlobObject> eager_blob_object() const override {
return impl_->eager_blob_object();
}
Maybe<VmLocalDepObject> compute_local_dep_object() const override {
Maybe<LocalDepObject*> compute_local_dep_object() const override {
return impl_->compute_local_dep_object();
}
Maybe<TensorStorage> tensor_storage() const override { return impl_->tensor_storage(); }
......@@ -415,10 +415,6 @@ class MirroredTensor final : public TensorIf<MirroredTensor>,
}
user_op::TensorDesc* mut_tensor_meta() override { return impl_->mut_tensor_meta(); }
Maybe<MirroredTensor> MakeEagerTensor(
const std::shared_ptr<vm::EagerBlobObject> eager_blob_object, const Symbol<Device>& device,
const std::shared_ptr<TensorStorage> tensor_storage, bool requires_grad, bool is_leaf);
Maybe<MirroredTensor> AsMirroredTensor() override { return shared_from_this(); }
Maybe<ConsistentTensor> AsConsistentTensor() override { UNIMPLEMENTED_THEN_RETURN(); }
......@@ -457,7 +453,7 @@ class ConsistentTensor final : public TensorIf<ConsistentTensor>,
Maybe<vm::EagerBlobObject> eager_blob_object() const override {
return impl_->eager_blob_object();
}
Maybe<VmLocalDepObject> compute_local_dep_object() const override {
Maybe<LocalDepObject*> compute_local_dep_object() const override {
return impl_->compute_local_dep_object();
}
const TensorMeta& tensor_meta() const override { return *impl_->tensor_meta(); }
......
......@@ -24,7 +24,7 @@ limitations under the License.
#include "oneflow/core/framework/device.h"
#include "oneflow/core/framework/dtype.h"
#include "oneflow/core/eager/eager_blob_object.h"
#include "oneflow/core/framework/vm_local_dep_object.h"
#include "oneflow/core/framework/local_dep_object.h"
#include "oneflow/core/vm/vm_util.h"
#include "oneflow/core/operator/operator.h"
#include "oneflow/core/control/global_process_ctx.h"
......@@ -97,31 +97,20 @@ Maybe<void> EagerMirroredTensorImpl::UpdateTensorStorage() {
return Maybe<void>::Ok();
}
Maybe<VmLocalDepObject> EagerMirroredTensorImpl::compute_local_dep_object() const {
Maybe<LocalDepObject*> EagerMirroredTensorImpl::compute_local_dep_object() const {
return JUST(eager_blob_object())->compute_local_dep_object();
}
Maybe<void> EagerMirroredTensorImpl::InitEagerBlobObject(
const std::shared_ptr<MemoryCase>& mem_case) {
const auto& tensor_device = device();
CHECK_OR_RETURN(static_cast<bool>(tensor_device));
Maybe<void> EagerMirroredTensorImpl::InitEagerBlobObject(LocalDepObject* dep_object) {
CHECK_OR_RETURN(static_cast<bool>(device()));
const auto& mem_case = device()->mem_case();
const auto& mut_shape = std::const_pointer_cast<Shape>(tensor_meta()->shape_ptr());
const auto& eager_blob_object = std::make_shared<vm::EagerBlobObject>(
mem_case, mut_shape, dtype(), std::make_shared<vm::TensorBuffer>(),
tensor_device->parallel_desc_ptr());
mem_case, mut_shape, dtype(), std::make_shared<vm::TensorBuffer>(), dep_object);
JUST(set_eager_blob_object(eager_blob_object));
return Maybe<void>::Ok();
}
Maybe<void> EagerMirroredTensorImpl::InitEagerBlobObjectAndTensorStorage(
const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object,
const std::shared_ptr<TensorStorage>& tensor_storage) {
CHECK_OR_RETURN(eager_blob_object->tensor_buffer() == tensor_storage->buffer());
eager_blob_object_ = eager_blob_object;
tensor_storage_ = tensor_storage;
return Maybe<void>::Ok();
}
Maybe<void> EagerMirroredTensorImpl::set_eager_blob_object(
std::shared_ptr<vm::EagerBlobObject> eager_blob_object) {
eager_blob_object_ = eager_blob_object;
......@@ -238,7 +227,8 @@ Maybe<Shape> GetPhysicalShape(const Shape& logical_shape, const cfg::NdSbp& nd_s
std::make_shared<MirroredTensorMeta>(cur_rank_phy_shape, dtype, device);
auto cur_rank_phy_tensor_impl =
std::make_shared<EagerMirroredTensorImpl>(cur_rank_phy_tensor_meta, requires_grad, is_leaf);
JUST(cur_rank_phy_tensor_impl->InitEagerBlobObject(device->mem_case()));
const auto& dep_object = JUST(GetLocalDepObject(device));
JUST(cur_rank_phy_tensor_impl->InitEagerBlobObject(dep_object));
const auto& cur_rank_phy_tensor = std::make_shared<MirroredTensor>(cur_rank_phy_tensor_impl);
auto* tensor_impl =
new EagerConsistentTensorImpl(consistent_tensor_meta, cur_rank_phy_tensor->requires_grad(),
......
......@@ -32,7 +32,7 @@ limitations under the License.
namespace oneflow {
class MemoryCase;
class VmLocalDepObject;
class LocalDepObject;
namespace cfg {
......@@ -62,7 +62,7 @@ class TensorImpl {
// Getters valid only for EagerMirroredTensorImpl
virtual Maybe<vm::EagerBlobObject> eager_blob_object() const = 0;
virtual Maybe<VmLocalDepObject> compute_local_dep_object() const = 0;
virtual Maybe<LocalDepObject*> compute_local_dep_object() const = 0;
virtual Maybe<TensorStorage> tensor_storage() const { OF_UNIMPLEMENTED(); }
virtual Maybe<bool> has_eager_blob_object() const = 0;
virtual Maybe<const Stride> stride() const { OF_UNIMPLEMENTED(); }
......@@ -142,7 +142,7 @@ class ConsistentTensorImpl : public TensorImpl {
// Getters valid only for EagerMirroredTensorImpl
Maybe<vm::EagerBlobObject> eager_blob_object() const override { OF_UNIMPLEMENTED(); }
Maybe<VmLocalDepObject> compute_local_dep_object() const override { OF_UNIMPLEMENTED(); }
Maybe<LocalDepObject*> compute_local_dep_object() const override { OF_UNIMPLEMENTED(); }
Maybe<bool> has_eager_blob_object() const override { OF_UNIMPLEMENTED(); }
// Setters
......@@ -187,7 +187,7 @@ class LazyMirroredTensorImpl final : public MirroredTensorImpl {
// Getters valid only for EagerMirroredTensorImpl
Maybe<vm::EagerBlobObject> eager_blob_object() const override { OF_UNIMPLEMENTED(); }
Maybe<VmLocalDepObject> compute_local_dep_object() const override { OF_UNIMPLEMENTED(); }
Maybe<LocalDepObject*> compute_local_dep_object() const override { OF_UNIMPLEMENTED(); }
Maybe<TensorStorage> tensor_storage() const override { OF_UNIMPLEMENTED(); }
Maybe<bool> has_eager_blob_object() const override { OF_UNIMPLEMENTED(); }
Maybe<MirroredTensorImpl> detach() const override;
......@@ -214,7 +214,7 @@ class EagerMirroredTensorImpl final : public MirroredTensorImpl {
CHECK_OR_RETURN(eager_blob_object_);
return eager_blob_object_;
}
Maybe<VmLocalDepObject> compute_local_dep_object() const override;
Maybe<LocalDepObject*> compute_local_dep_object() const override;
Maybe<TensorStorage> tensor_storage() const override {
CHECK_OR_RETURN(eager_blob_object_);
return tensor_storage_;
......@@ -226,10 +226,7 @@ class EagerMirroredTensorImpl final : public MirroredTensorImpl {
// Setters
TensorStorage* mut_tensor_storage() { return tensor_storage_.get(); }
Maybe<void> InitEagerBlobObject(const std::shared_ptr<MemoryCase>& mem_case);
Maybe<void> InitEagerBlobObjectAndTensorStorage(
const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object,
const std::shared_ptr<TensorStorage>& tensor_storage);
Maybe<void> InitEagerBlobObject(LocalDepObject* dep_object);
Maybe<EagerMirroredTensorImpl*> mut_eager_mirrored_tensor_impl() override { return this; }
private:
......
......@@ -15,7 +15,7 @@ limitations under the License.
*/
#include "oneflow/core/framework/tensor_storage.h"
#include "oneflow/core/eager/eager_blob_object.h"
#include "oneflow/core/framework/vm_local_dep_object.h"
#include "oneflow/core/framework/local_dep_object.h"
#include "oneflow/core/framework/shut_down_util.h"
namespace oneflow {
......
......@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/vm/access_blob_arg_cb_phy_instr_operand.h"
#include "oneflow/core/framework/vm_local_dep_object.h"
#include "oneflow/core/framework/local_dep_object.h"
#include "oneflow/core/framework/tensor_storage.h"
#include "oneflow/core/object_msg/object_msg_list.h"
......@@ -24,26 +24,17 @@ namespace vm {
void AccessBlobArgCbPhyInstrOperand::ForEachConstMirroredObject(
const std::function<void(MirroredObject* infer, MirroredObject* compute)>& DoEach) const {
if (modifier_ == "const") {
auto* compute_local_dep_object = compute_local_dep_object_->mut_local_dep_object();
DoEach(nullptr, compute_local_dep_object->mut_mirrored_object());
}
if (modifier_ == "const") { DoEach(nullptr, compute_local_dep_object_->mut_mirrored_object()); }
}
void AccessBlobArgCbPhyInstrOperand::ForEachMutMirroredObject(
const std::function<void(MirroredObject* infer, MirroredObject* compute)>& DoEach) const {
if (modifier_ == "mut") {
auto* compute_local_dep_object = compute_local_dep_object_->mut_local_dep_object();
DoEach(nullptr, compute_local_dep_object->mut_mirrored_object());
}
if (modifier_ == "mut") { DoEach(nullptr, compute_local_dep_object_->mut_mirrored_object()); }
}
void AccessBlobArgCbPhyInstrOperand::ForEachMut2MirroredObject(
const std::function<void(MirroredObject* infer, MirroredObject* compute)>& DoEach) const {
if (modifier_ == "mut2") {
auto* compute_local_dep_object = compute_local_dep_object_->mut_local_dep_object();
DoEach(nullptr, compute_local_dep_object->mut_mirrored_object());
}
if (modifier_ == "mut2") { DoEach(nullptr, compute_local_dep_object_->mut_mirrored_object()); }
}
} // namespace vm
......
......@@ -21,7 +21,7 @@ limitations under the License.
namespace oneflow {
class VmLocalDepObject;
class LocalDepObject;
namespace one {
......@@ -36,7 +36,7 @@ class EagerBlobObject;
class AccessBlobArgCbPhyInstrOperand : public PhyInstrOperand {
public:
AccessBlobArgCbPhyInstrOperand(const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object,
const std::shared_ptr<VmLocalDepObject>& compute_local_dep_object,
LocalDepObject* compute_local_dep_object,
const std::function<void(uint64_t)>& callback,
const std::string& modifier)
: eager_blob_object_(eager_blob_object),
......@@ -62,7 +62,7 @@ class AccessBlobArgCbPhyInstrOperand : public PhyInstrOperand {
private:
std::shared_ptr<vm::EagerBlobObject> eager_blob_object_;
std::function<void(uint64_t)> callback_;
std::shared_ptr<VmLocalDepObject> compute_local_dep_object_;
LocalDepObject* compute_local_dep_object_;
const std::string modifier_;
};
......
......@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/vm/release_tensor_arg_phy_instr_operand.h"
#include "oneflow/core/framework/vm_local_dep_object.h"
#include "oneflow/core/framework/local_dep_object.h"
namespace oneflow {
......@@ -27,8 +27,7 @@ void ReleaseTensorArgPhyInstrOperand::ForEachConstMirroredObject(
void ReleaseTensorArgPhyInstrOperand::ForEachMutMirroredObject(
const std::function<void(MirroredObject* infer, MirroredObject* compute)>& DoEach) const {
auto* compute_local_dep_object = compute_local_dep_object_->mut_local_dep_object();
DoEach(nullptr, compute_local_dep_object->mut_mirrored_object());
DoEach(nullptr, compute_local_dep_object_->mut_mirrored_object());
}
void ReleaseTensorArgPhyInstrOperand::ForEachMut2MirroredObject(
......
......@@ -21,7 +21,8 @@ limitations under the License.
namespace oneflow {
class VmLocalDepObject;
class LocalDepObject;
namespace vm {
class EagerBlobObject;
......@@ -29,7 +30,7 @@ class EagerBlobObject;
class ReleaseTensorArgPhyInstrOperand : public PhyInstrOperand {
public:
ReleaseTensorArgPhyInstrOperand(const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object,
const std::shared_ptr<VmLocalDepObject>& compute_local_dep_object)
LocalDepObject* compute_local_dep_object)
: eager_blob_object_(eager_blob_object),
compute_local_dep_object_(compute_local_dep_object) {}
~ReleaseTensorArgPhyInstrOperand() override = default;
......@@ -49,7 +50,7 @@ class ReleaseTensorArgPhyInstrOperand : public PhyInstrOperand {
private:
std::shared_ptr<vm::EagerBlobObject> eager_blob_object_;
std::shared_ptr<VmLocalDepObject> compute_local_dep_object_;
LocalDepObject* compute_local_dep_object_;
};
} // namespace vm
......
......@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/vm/soft_sync_stream_phy_instr_operand.h"
#include "oneflow/core/framework/vm_local_dep_object.h"
#include "oneflow/core/framework/local_dep_object.h"
namespace oneflow {
......@@ -22,26 +22,17 @@ namespace vm {
void SoftSyncStreamPhyInstrOperand::ForEachConstMirroredObject(
const std::function<void(MirroredObject* infer, MirroredObject* compute)>& DoEach) const {
if (modifier_ == "const") {
auto* compute_local_dep_object = compute_local_dep_object_->mut_local_dep_object();
DoEach(nullptr, compute_local_dep_object->mut_mirrored_object());
}
if (modifier_ == "const") { DoEach(nullptr, compute_local_dep_object_->mut_mirrored_object()); }
}
void SoftSyncStreamPhyInstrOperand::ForEachMutMirroredObject(
const std::function<void(MirroredObject* infer, MirroredObject* compute)>& DoEach) const {
if (modifier_ == "mut") {
auto* compute_local_dep_object = compute_local_dep_object_->mut_local_dep_object();
DoEach(nullptr, compute_local_dep_object->mut_mirrored_object());
}
if (modifier_ == "mut") { DoEach(nullptr, compute_local_dep_object_->mut_mirrored_object()); }
}
void SoftSyncStreamPhyInstrOperand::ForEachMut2MirroredObject(
const std::function<void(MirroredObject* infer, MirroredObject* compute)>& DoEach) const {
if (modifier_ == "mut2") {
auto* compute_local_dep_object = compute_local_dep_object_->mut_local_dep_object();
DoEach(nullptr, compute_local_dep_object->mut_mirrored_object());
}
if (modifier_ == "mut2") { DoEach(nullptr, compute_local_dep_object_->mut_mirrored_object()); }
}
} // namespace vm
......
......@@ -21,13 +21,13 @@ limitations under the License.
namespace oneflow {
class VmLocalDepObject;
class LocalDepObject;
namespace vm {
class SoftSyncStreamPhyInstrOperand : public PhyInstrOperand {
public:
SoftSyncStreamPhyInstrOperand(const std::shared_ptr<VmLocalDepObject>& compute_local_dep_object,
SoftSyncStreamPhyInstrOperand(LocalDepObject* compute_local_dep_object,
const std::string& modifier)
: compute_local_dep_object_(compute_local_dep_object), modifier_(modifier) {}
~SoftSyncStreamPhyInstrOperand() = default;
......@@ -42,7 +42,7 @@ class SoftSyncStreamPhyInstrOperand : public PhyInstrOperand {
const std::function<void(MirroredObject* infer, MirroredObject* compute)>&) const override;
private:
std::shared_ptr<VmLocalDepObject> compute_local_dep_object_;
LocalDepObject* compute_local_dep_object_;
const std::string modifier_;
};
......
......@@ -22,6 +22,7 @@ limitations under the License.
#include "oneflow/core/common/util.h"
#include "oneflow/core/common/balanced_splitter.h"
#include "oneflow/core/common/spin_counter.h"
#include "oneflow/core/framework/device.h"
#include "oneflow/core/job/parallel_desc.h"
namespace oneflow {
......@@ -585,10 +586,10 @@ Maybe<void> VirtualMachine::Receive(InstructionMsgList* compute_instr_msg_list)
}
compute_instr_msg_list->MoveToDstBack(compute_instr_msg, &new_instr_msg_list);
}
static const int64_t kHighWaterMark = 500;
static const int64_t kLowWaterMark = 200;
const int64_t kHighWaterMark = GetInstructionHighWaterMark();
const int64_t kLowWaterMark = GetInstructionLowWaterMark();
if (*mut_flying_instruction_cnt() > kHighWaterMark) {
JUST(Global<ForeignLockHelper>::Get()->WithScopedRelease([this]() -> Maybe<void> {
JUST(Global<ForeignLockHelper>::Get()->WithScopedRelease([&, this]() -> Maybe<void> {
const auto& NeedSpin = [&] { return *mut_flying_instruction_cnt() > kLowWaterMark; };
while (true) {
int64_t last_cnt = *mut_flying_instruction_cnt();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册