未验证 提交 be43676e 编写于 作者: qq_22305325's avatar qq_22305325 提交者: GitHub

construct_consistentTensor_by_numpy (#6468)

* construct_consistentTensor_by_numpy

* fix typo

* fix error

* auto format by CI

* update get-oneflow
Co-authored-by: Nliufengwei <2472937968@qq.com>
Co-authored-by: Noneflow-ci-bot <ci-bot@oneflow.org>
Co-authored-by: Ntsai <jackalcooper@gmail.com>
Co-authored-by: Noneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
上级 944a87ce
......@@ -87,9 +87,8 @@ class ConsistentTensorWithDataFunctor {
const auto& other = JUST(PyUnpackTensor(data));
return MakeTensorFromOtherTensor(other, dtype, placement, sbp_tuple, requires_grad);
// TODO(): Construct consistent tensor from sequence or numpy ndarray.
return Error::RuntimeError()
<< "Can not construct consistent tensor from sequence or numpy array currently.";
// Make consistent tensor from python sequence or numpy array.
return MakeConsistentTensorFromData(data, dtype, placement, sbp_tuple, requires_grad);
......@@ -163,9 +162,8 @@ class ConsistentTensorWithDataCtorFunctor {
return MakeTensorFromOtherTensor(other, dtype, placement, sbp_tuple,
// TODO(): Construct consistent tensor from sequence or numpy ndarray.
return Error::RuntimeError()
<< "Can not construct consistent tensor from sequence or numpy array currently.";
// Make consistent tensor from python sequence or numpy array.
return MakeConsistentTensorFromData(data, dtype, placement, sbp_tuple, /*requires_grad=*/false);
......@@ -23,6 +23,9 @@ limitations under the License.
#include "oneflow/core/framework/nd_sbp.h"
#include "oneflow/core/functional/functional.h"
#include "oneflow/extension/python/numpy.h"
#include "oneflow/core/framework/transport_util.h"
#include "oneflow/core/job/rank_group_scope.h"
#include "oneflow/core/common/decorator.h"
namespace py = pybind11;
......@@ -160,6 +163,120 @@ Maybe<Tensor> MakeLocalTensorFromData(PyObject* data, const Optional<Symbol<DTyp
return tensor;
namespace {
Maybe<Symbol<cfg::NdSbp>> GetAllBroadcastNdSbp(size_t ndim) {
cfg::NdSbp broadcast_nd_sbp;
for (size_t i = 0; i < ndim; ++i) {
return SymbolOf(broadcast_nd_sbp);
auto* CachedGetAllBroadcastNdSbp = DECORATE(&GetAllBroadcastNdSbp, ThreadLocal);
template<typename T>
bool CheckVecEqual(size_t size, const T* in0, const T* in1) {
for (size_t i = 0; i < size; ++i) {
if (*(in0 + i) != *(in1 + i)) { return false; }
return true;
} // namespace
template<typename T>
Maybe<void> DataConsistencyCheck(py::array_t<T> array, size_t elem_cnt,
Symbol<ParallelDesc> placement) {
const auto& rank_group = JUST(RankGroup::New(placement));
size_t data_size = elem_cnt * sizeof(T);
TransportToken transport_token = JUST(TransportToken::NewTransportToken(kTransportTokenTypeData));
py::array contiguous_array = py::reinterpret_steal<py::array>(reinterpret_cast<PyObject*>(
py::buffer_info buf = contiguous_array.request();
T* buf_ptr = (T*)buf.ptr;
size_t array_size = buf.size;
CHECK_EQ_OR_RETURN(array_size, elem_cnt);
std::vector<T> recv_buffer(elem_cnt);
T* recv_ptr = recv_buffer.data();
NaiveAsyncTransportCtx ctx(
[&](void** buffer, std::size_t* size, std::function<void()>* Cb) -> Maybe<void> {
*buffer = reinterpret_cast<void*>(buf_ptr);
*size = data_size;
*Cb = [] {};
return Maybe<void>::Ok();
[&](void** buffer, std::size_t* size, std::function<void()>* Cb) -> Maybe<void> {
*buffer = recv_ptr;
*size = data_size;
*Cb = [] {};
return Maybe<void>::Ok();
JUST(TransportUtil::SendToNextRankInRing(rank_group, transport_token, &ctx));
JUST(TransportUtil::ReceiveFromPrevRankInRing(rank_group, transport_token, &ctx));
JUST(TransportUtil::WaitUntilDoneOrTimeout(ctx, TransportUtil::TimeoutSeconds()));
CHECK_OR_RETURN(CheckVecEqual(elem_cnt, buf_ptr, recv_ptr))
<< "Each rank must have same input sequence or numpy array";
return Maybe<void>::Ok();
#define MAKE_SWITCH_ENTRY(func_name, dtype) func_name<dtype>
Maybe<Tensor> MakeConsistentTensorFromData(PyObject* data, const Optional<Symbol<DType>>& dtype,
Symbol<ParallelDesc> placement,
const std::vector<Symbol<cfg::SbpParallel>>& sbp_tuple,
bool requires_grad) {
auto* np_arr_pyobject = PyArray_FromAny(data, nullptr, 0, 0, NPY_ARRAY_DEFAULT, nullptr);
if (!np_arr_pyobject) {
return Error::RuntimeError() << "Can not convert input data to a numpy array.";
// transfer the ownership to np_arr_raii so that the ref count
// can be decreased automatically when function exits either normally or abnormally
auto np_arr_raii = py::reinterpret_steal<py::array>(np_arr_pyobject);
auto* np_arr = reinterpret_cast<PyArrayObject*>(np_arr_pyobject);
const npy_intp* dims_ptr = PyArray_SHAPE(np_arr);
const Shape shape(DimVector(dims_ptr, dims_ptr + PyArray_NDIM(np_arr)));
DataType data_type = JUST(numpy::GetOFDataTypeFromNpArray(np_arr));
JUST(SwitchDataConsistencyCheck(SwitchCase(data_type), np_arr_raii, shape.elem_cnt(), placement));
const std::string& device_tag = placement->device_tag();
Symbol<Device> device;
if (device_tag == "cpu") {
device = JUST(Device::New("cpu"));
} else {
device = JUST(Device::New("cuda"));
std::shared_ptr<Tensor> local_tensor =
JUST(functional::Empty(shape, JUST(DType::Get(data_type)), device));
JUST(SwitchCopyMirroredTensorFromUntypedArray(SwitchCase(data_type), local_tensor, np_arr_raii));
// Cast to float if data is double sequence, rather than numpy array.
Symbol<DType> dtype_;
if (dtype) {
dtype_ = JUST(dtype);
} else if (!dtype && data_type == DataType::kDouble && !PyArray_Check(data)) {
dtype_ = DType::Float();
if (dtype_) { local_tensor = JUST(functional::Cast(local_tensor, dtype_)); }
size_t sbp_dims = sbp_tuple.size();
Symbol<cfg::NdSbp> broadcast_nd_sbp = JUST(CachedGetAllBroadcastNdSbp(sbp_dims));
std::shared_ptr<Tensor> broadcast_tensor = JUST(functional::LocalToConsistent(
local_tensor, placement, *JUST(GetSbpList(broadcast_nd_sbp)), shape, dtype_));
std::vector<Symbol<cfg::SbpParallel>> grad_sbp_tuple;
return JUST(functional::ToConsistent(broadcast_tensor, placement, sbp_tuple, grad_sbp_tuple));
Maybe<Tensor> MakeTensorFromOtherTensor(const std::shared_ptr<Tensor>& other) {
if (other->is_local()) {
const Symbol<Device>& device = JUST(other->device());
......@@ -76,6 +76,11 @@ Maybe<py::tuple> TensorGetPyTupleOfSbp(const Tensor& tensor);
Maybe<Tensor> MakeLocalTensorFromData(PyObject* data, const Optional<Symbol<DType>>& dtype,
const Optional<Symbol<Device>>& device, bool requires_grad);
Maybe<Tensor> MakeConsistentTensorFromData(PyObject* data, const Optional<Symbol<DType>>& dtype,
Symbol<ParallelDesc> placement,
const std::vector<Symbol<cfg::SbpParallel>>& sbp_tuple,
bool requires_grad);
Maybe<Tensor> MakeTensorFromOtherTensor(const std::shared_ptr<Tensor>& other);
Maybe<Tensor> MakeTensorFromOtherTensor(const std::shared_ptr<Tensor>& other,
......@@ -1476,6 +1476,23 @@ class TestTensor(flow.unittest.TestCase):
y = input.floor_divide(other)
return y
def test_construct_consistent_tensor_by_numpy(test_case):
x = np.ones((4, 4), dtype=np.int32)
placement = flow.placement("cuda", {0: [0, 1, 2, 3]})
y = flow.tensor(
test_case.assertTrue(y.dtype == flow.float32)
np.allclose(y.to_local().numpy(), np.ones((1, 4), dtype=np.float32))
test_case.assertEqual(y.placement, placement)
if __name__ == "__main__":
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册