提交 c9a3d3b6 编写于 作者: G guru4elephant 提交者: xjqbest

Merge pull request #16652 from xjqbest/dataset_merge_develop

fix dataset bug
上级 1237dfa6
...@@ -242,6 +242,11 @@ void InMemoryDataFeed<T>::SetTrainerNum(int trainer_num) { ...@@ -242,6 +242,11 @@ void InMemoryDataFeed<T>::SetTrainerNum(int trainer_num) {
trainer_num_ = trainer_num; trainer_num_ = trainer_num;
} }
template <typename T>
void InMemoryDataFeed<T>::SetFleetSendBatchSize(int64_t size) {
fleet_send_batch_size_ = size;
}
template <typename T> template <typename T>
void InMemoryDataFeed<T>::PutInsToChannel(const std::string& ins_str) { void InMemoryDataFeed<T>::PutInsToChannel(const std::string& ins_str) {
#ifdef _LINUX #ifdef _LINUX
...@@ -361,8 +366,13 @@ void InMemoryDataFeed<T>::GlobalShuffle() { ...@@ -361,8 +366,13 @@ void InMemoryDataFeed<T>::GlobalShuffle() {
VLOG(3) << "GlobalShuffle() begin, thread_id=" << thread_id_; VLOG(3) << "GlobalShuffle() begin, thread_id=" << thread_id_;
auto fleet_ptr = FleetWrapper::GetInstance(); auto fleet_ptr = FleetWrapper::GetInstance();
std::vector<std::vector<T*>> send_vec(trainer_num_); std::vector<std::vector<T*>> send_vec(trainer_num_);
std::vector<int> send_index(trainer_num_);
uint64_t reserve_len = fleet_send_batch_size_ / trainer_num_;
for (auto& vec : send_vec) { for (auto& vec : send_vec) {
vec.reserve(fleet_send_batch_size_); vec.reserve(reserve_len);
}
for (int i = 0; i < trainer_num_; ++i) {
send_index[i] = i;
} }
std::vector<std::future<int32_t>> total_status; std::vector<std::future<int32_t>> total_status;
auto interval = GetMemoryDataInterval(); auto interval = GetMemoryDataInterval();
...@@ -375,7 +385,10 @@ void InMemoryDataFeed<T>::GlobalShuffle() { ...@@ -375,7 +385,10 @@ void InMemoryDataFeed<T>::GlobalShuffle() {
int64_t node_id = random_num % trainer_num_; int64_t node_id = random_num % trainer_num_;
send_vec[node_id].push_back(&((*memory_data_)[i])); send_vec[node_id].push_back(&((*memory_data_)[i]));
if (i % fleet_send_batch_size_ == 0 && i != 0) { if (i % fleet_send_batch_size_ == 0 && i != 0) {
for (int j = 0; j < send_vec.size(); ++j) { // shuffle the sequence of sending to avoid network timeout error
std::random_shuffle(send_index.begin(), send_index.end());
for (int index = 0; index < send_index.size(); ++index) {
int j = send_index[index];
std::string send_str; std::string send_str;
SerializeIns(send_vec[j], &send_str); SerializeIns(send_vec[j], &send_str);
VLOG(3) << "send str_length=" << send_str.length() VLOG(3) << "send str_length=" << send_str.length()
...@@ -388,7 +401,10 @@ void InMemoryDataFeed<T>::GlobalShuffle() { ...@@ -388,7 +401,10 @@ void InMemoryDataFeed<T>::GlobalShuffle() {
} }
} }
} }
for (int j = 0; j < send_vec.size(); ++j) { // shuffle the sequence of sending to avoid network timeout error
std::random_shuffle(send_index.begin(), send_index.end());
for (int index = 0; index < send_index.size(); ++index) {
int j = send_index[index];
if (send_vec[j].size() != 0) { if (send_vec[j].size() != 0) {
std::string send_str; std::string send_str;
SerializeIns(send_vec[j], &send_str); SerializeIns(send_vec[j], &send_str);
......
...@@ -94,6 +94,8 @@ class DataFeed { ...@@ -94,6 +94,8 @@ class DataFeed {
virtual void SetThreadNum(int thread_num) {} virtual void SetThreadNum(int thread_num) {}
// This function will do nothing at default // This function will do nothing at default
virtual void SetTrainerNum(int trainer_num) {} virtual void SetTrainerNum(int trainer_num) {}
// This function will do nothing at default
virtual void SetFleetSendBatchSize(int64_t size) {}
virtual void SetFileListMutex(std::mutex* mutex) { virtual void SetFileListMutex(std::mutex* mutex) {
mutex_for_pick_file_ = mutex; mutex_for_pick_file_ = mutex;
} }
...@@ -212,6 +214,7 @@ class InMemoryDataFeed : public PrivateQueueDataFeed<T> { ...@@ -212,6 +214,7 @@ class InMemoryDataFeed : public PrivateQueueDataFeed<T> {
virtual void SetThreadId(int thread_id); virtual void SetThreadId(int thread_id);
virtual void SetThreadNum(int thread_num); virtual void SetThreadNum(int thread_num);
virtual void SetTrainerNum(int trainer_num); virtual void SetTrainerNum(int trainer_num);
virtual void SetFleetSendBatchSize(int64_t size);
virtual void PutInsToChannel(const std::string& ins_str); virtual void PutInsToChannel(const std::string& ins_str);
virtual void FillMemoryDataToChannel(); virtual void FillMemoryDataToChannel();
virtual void FillChannelToMemoryData(); virtual void FillChannelToMemoryData();
......
...@@ -64,6 +64,17 @@ void DatasetImpl<T>::SetTrainerNum(int trainer_num) { ...@@ -64,6 +64,17 @@ void DatasetImpl<T>::SetTrainerNum(int trainer_num) {
} }
} }
// if you run distributed, and want to do global shuffle,
// set this before global shuffle.
// be sure you call CreateReaders before SetFleetSendBatchSize
template <typename T>
void DatasetImpl<T>::SetFleetSendBatchSize(int64_t size) {
fleet_send_batch_size_ = size;
for (auto reader : readers_) {
reader->SetFleetSendBatchSize(size);
}
}
template <typename T> template <typename T>
void DatasetImpl<T>::SetHdfsConfig(const std::string& fs_name, void DatasetImpl<T>::SetHdfsConfig(const std::string& fs_name,
const std::string& fs_ugi) { const std::string& fs_ugi) {
......
...@@ -47,6 +47,8 @@ class Dataset { ...@@ -47,6 +47,8 @@ class Dataset {
virtual void SetThreadNum(int thread_num) = 0; virtual void SetThreadNum(int thread_num) = 0;
// set workers' num // set workers' num
virtual void SetTrainerNum(int trainer_num) = 0; virtual void SetTrainerNum(int trainer_num) = 0;
// set fleet send batch size
virtual void SetFleetSendBatchSize(int64_t size) = 0;
// set fs name and ugi // set fs name and ugi
virtual void SetHdfsConfig(const std::string& fs_name, virtual void SetHdfsConfig(const std::string& fs_name,
const std::string& fs_ugi) = 0; const std::string& fs_ugi) = 0;
...@@ -59,6 +61,8 @@ class Dataset { ...@@ -59,6 +61,8 @@ class Dataset {
virtual int GetThreadNum() = 0; virtual int GetThreadNum() = 0;
// get worker num // get worker num
virtual int GetTrainerNum() = 0; virtual int GetTrainerNum() = 0;
// get fleet send batch size
virtual int64_t GetFleetSendBatchSize() = 0;
// get hdfs config // get hdfs config
virtual std::pair<std::string, std::string> GetHdfsConfig() = 0; virtual std::pair<std::string, std::string> GetHdfsConfig() = 0;
// get data fedd desc // get data fedd desc
...@@ -98,6 +102,7 @@ class DatasetImpl : public Dataset { ...@@ -98,6 +102,7 @@ class DatasetImpl : public Dataset {
virtual void SetFileList(const std::vector<std::string>& filelist); virtual void SetFileList(const std::vector<std::string>& filelist);
virtual void SetThreadNum(int thread_num); virtual void SetThreadNum(int thread_num);
virtual void SetTrainerNum(int trainer_num); virtual void SetTrainerNum(int trainer_num);
virtual void SetFleetSendBatchSize(int64_t size);
virtual void SetHdfsConfig(const std::string& fs_name, virtual void SetHdfsConfig(const std::string& fs_name,
const std::string& fs_ugi); const std::string& fs_ugi);
virtual void SetDataFeedDesc(const std::string& data_feed_desc_str); virtual void SetDataFeedDesc(const std::string& data_feed_desc_str);
...@@ -105,6 +110,7 @@ class DatasetImpl : public Dataset { ...@@ -105,6 +110,7 @@ class DatasetImpl : public Dataset {
virtual const std::vector<std::string>& GetFileList() { return filelist_; } virtual const std::vector<std::string>& GetFileList() { return filelist_; }
virtual int GetThreadNum() { return thread_num_; } virtual int GetThreadNum() { return thread_num_; }
virtual int GetTrainerNum() { return trainer_num_; } virtual int GetTrainerNum() { return trainer_num_; }
virtual int64_t GetFleetSendBatchSize() { return fleet_send_batch_size_; }
virtual std::pair<std::string, std::string> GetHdfsConfig() { virtual std::pair<std::string, std::string> GetHdfsConfig() {
return std::make_pair(fs_name_, fs_ugi_); return std::make_pair(fs_name_, fs_ugi_);
} }
...@@ -137,6 +143,7 @@ class DatasetImpl : public Dataset { ...@@ -137,6 +143,7 @@ class DatasetImpl : public Dataset {
std::string fs_name_; std::string fs_name_;
std::string fs_ugi_; std::string fs_ugi_;
unsigned int rand_seed; unsigned int rand_seed;
int64_t fleet_send_batch_size_;
}; };
// use std::vector<MultiSlotType> as data type // use std::vector<MultiSlotType> as data type
......
...@@ -237,6 +237,7 @@ void FleetWrapper::PushDenseParamSync( ...@@ -237,6 +237,7 @@ void FleetWrapper::PushDenseParamSync(
std::vector<paddle::ps::Region> regions; std::vector<paddle::ps::Region> regions;
for (auto& t : var_names) { for (auto& t : var_names) {
Variable* var = scope.FindVar(t); Variable* var = scope.FindVar(t);
CHECK(var != nullptr) << "var[" << t << "] not found";
LoDTensor* tensor = var->GetMutable<LoDTensor>(); LoDTensor* tensor = var->GetMutable<LoDTensor>();
float* g = tensor->mutable_data<float>(place); float* g = tensor->mutable_data<float>(place);
paddle::ps::Region reg(g, tensor->numel()); paddle::ps::Region reg(g, tensor->numel());
......
...@@ -126,7 +126,7 @@ static int shell_popen_fork_internal(const char* real_cmd, bool do_read, ...@@ -126,7 +126,7 @@ static int shell_popen_fork_internal(const char* real_cmd, bool do_read,
} }
close_open_fds_internal(); close_open_fds_internal();
if (execl("/bin/sh", "sh", "-c", real_cmd, NULL) < 0) { if (execl("/bin/bash", "bash", "-c", real_cmd, NULL) < 0) {
return -1; return -1;
} }
exit(127); exit(127);
......
...@@ -50,11 +50,15 @@ void BindDataset(py::module* m) { ...@@ -50,11 +50,15 @@ void BindDataset(py::module* m) {
.def("set_filelist", &framework::Dataset::SetFileList) .def("set_filelist", &framework::Dataset::SetFileList)
.def("set_thread_num", &framework::Dataset::SetThreadNum) .def("set_thread_num", &framework::Dataset::SetThreadNum)
.def("set_trainer_num", &framework::Dataset::SetTrainerNum) .def("set_trainer_num", &framework::Dataset::SetTrainerNum)
.def("set_fleet_send_batch_size",
&framework::Dataset::SetFleetSendBatchSize)
.def("set_hdfs_config", &framework::Dataset::SetHdfsConfig) .def("set_hdfs_config", &framework::Dataset::SetHdfsConfig)
.def("set_data_feed_desc", &framework::Dataset::SetDataFeedDesc) .def("set_data_feed_desc", &framework::Dataset::SetDataFeedDesc)
.def("get_filelist", &framework::Dataset::GetFileList) .def("get_filelist", &framework::Dataset::GetFileList)
.def("get_thread_num", &framework::Dataset::GetThreadNum) .def("get_thread_num", &framework::Dataset::GetThreadNum)
.def("get_trainer_num", &framework::Dataset::GetTrainerNum) .def("get_trainer_num", &framework::Dataset::GetTrainerNum)
.def("get_fleet_send_batch_size",
&framework::Dataset::GetFleetSendBatchSize)
.def("get_hdfs_config", &framework::Dataset::GetHdfsConfig) .def("get_hdfs_config", &framework::Dataset::GetHdfsConfig)
.def("get_data_feed_desc", &framework::Dataset::GetDataFeedDesc) .def("get_data_feed_desc", &framework::Dataset::GetDataFeedDesc)
.def("register_client2client_msg_handler", .def("register_client2client_msg_handler",
......
...@@ -236,11 +236,13 @@ class InMemoryDataset(DatasetBase): ...@@ -236,11 +236,13 @@ class InMemoryDataset(DatasetBase):
fleet: fleet singleton. Default None. fleet: fleet singleton. Default None.
""" """
trainer_num = 1 trainer_num = 1
fleet_send_batch_size = 80000
if fleet is not None: if fleet is not None:
fleet.fleet_instance.role_maker_._barrier_worker() fleet.fleet_instance.role_maker_._barrier_worker()
trainer_num = fleet.worker_num() trainer_num = fleet.worker_num()
self.dataset.register_client2client_msg_handler() self.dataset.register_client2client_msg_handler()
self.dataset.set_trainer_num(trainer_num) self.dataset.set_trainer_num(trainer_num)
self.dataset.set_fleet_send_batch_size(fleet_send_batch_size)
if fleet is not None: if fleet is not None:
fleet.fleet_instance.role_maker_._barrier_worker() fleet.fleet_instance.role_maker_._barrier_worker()
self.dataset.global_shuffle() self.dataset.global_shuffle()
......
...@@ -712,7 +712,7 @@ class Executor(object): ...@@ -712,7 +712,7 @@ class Executor(object):
if dataset == None: if dataset == None:
raise RuntimeError("dataset is needed and should be initialized") raise RuntimeError("dataset is needed and should be initialized")
if self.place == paddle.fluid.CUDAPlace(): if not isinstance(self.place, core.CPUPlace):
raise RuntimeError("infer_from_dataset is verified on CPUPlace" raise RuntimeError("infer_from_dataset is verified on CPUPlace"
"We will open CUDAPlace in the future") "We will open CUDAPlace in the future")
...@@ -796,7 +796,7 @@ class Executor(object): ...@@ -796,7 +796,7 @@ class Executor(object):
if dataset == None: if dataset == None:
raise RuntimeError("dataset is need and should be initialized") raise RuntimeError("dataset is need and should be initialized")
if self.place == paddle.fluid.CUDAPlace(): if not isinstance(self.place, core.CPUPlace):
raise RuntimeError("train_from_dataset is verified on CPUPlace" raise RuntimeError("train_from_dataset is verified on CPUPlace"
"We will open CUDAPlace in the future") "We will open CUDAPlace in the future")
......
...@@ -123,18 +123,25 @@ class Fleet(object): ...@@ -123,18 +123,25 @@ class Fleet(object):
print("You should run DistributedOptimizer.minimize() first") print("You should run DistributedOptimizer.minimize() first")
sys.exit(-1) sys.exit(-1)
def init_worker(self, programs): def init_worker(self, programs, scopes=None):
""" """
init_worker(): will be called by user. When a user knows current process is_server(), he/she init_worker(): will be called by user. When a user knows current process is_server(), he/she
should call init_worker() to initialize global information about worker and connect should call init_worker() to initialize global information about worker and connect
worker with pserver. worker with pserver. You should run startup program before init_worker.
Args: Args:
programs(Program|list): a Program or a list of Programs programs(Program|list): a Program or a list of Programs
scopes(Scope|list): a Scope or a list of Scopes, default None.
""" """
if not isinstance(programs, list): if not isinstance(programs, list):
programs = [programs] programs = [programs]
if scopes is None:
scopes = [fluid.global_scope()] * len(programs)
if len(scopes) != len(programs):
print(
"You should make sure len(scopes) == len(programs) or set scopes None"
)
sys.exit(-1)
if self._opt_info: if self._opt_info:
if "fleet_desc" in self._opt_info: if "fleet_desc" in self._opt_info:
self._dist_desc_str = text_format.MessageToString( self._dist_desc_str = text_format.MessageToString(
...@@ -160,7 +167,7 @@ class Fleet(object): ...@@ -160,7 +167,7 @@ class Fleet(object):
self.role_maker_._barrier_worker() self.role_maker_._barrier_worker()
if self.role_maker_._is_first_worker(): if self.role_maker_._is_first_worker():
tables = self._dist_desc.trainer_param.dense_table tables = self._dist_desc.trainer_param.dense_table
for prog in programs: for prog, scope in zip(programs, scopes):
prog_id = str(id(prog)) prog_id = str(id(prog))
prog_conf = self._opt_info['program_configs'][prog_id] prog_conf = self._opt_info['program_configs'][prog_id]
prog_tables = {} prog_tables = {}
...@@ -174,10 +181,16 @@ class Fleet(object): ...@@ -174,10 +181,16 @@ class Fleet(object):
continue continue
var_name_list = [] var_name_list = []
for i in range(0, len(table.dense_variable_name)): for i in range(0, len(table.dense_variable_name)):
var_name_list.append(table.dense_variable_name[i]) var_name = table.dense_variable_name[i]
self._fleet_ptr.init_model(prog.desc, if scope.find_var(var_name) is None:
int(table.table_id), print("var " + var_name +
var_name_list) " not found in scope, " +
"you should run startup program first")
sys.exit(-1)
var_name_list.append(var_name)
self._fleet_ptr.init_model(scope,
int(table.table_id),
var_name_list)
# barrier for init model done # barrier for init model done
self.role_maker_._barrier_worker() self.role_maker_._barrier_worker()
else: else:
......
...@@ -29,7 +29,6 @@ class TestDataset(unittest.TestCase): ...@@ -29,7 +29,6 @@ class TestDataset(unittest.TestCase):
def test_dataset_create(self): def test_dataset_create(self):
""" Testcase for dataset create. """ """ Testcase for dataset create. """
return
try: try:
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
except: except:
...@@ -48,7 +47,6 @@ class TestDataset(unittest.TestCase): ...@@ -48,7 +47,6 @@ class TestDataset(unittest.TestCase):
def test_dataset_config(self): def test_dataset_config(self):
""" Testcase for dataset configuration. """ """ Testcase for dataset configuration. """
return
dataset = fluid.core.Dataset("MultiSlotDataset") dataset = fluid.core.Dataset("MultiSlotDataset")
dataset.set_thread_num(12) dataset.set_thread_num(12)
dataset.set_filelist(["a.txt", "b.txt", "c.txt"]) dataset.set_filelist(["a.txt", "b.txt", "c.txt"])
...@@ -75,7 +73,6 @@ class TestDataset(unittest.TestCase): ...@@ -75,7 +73,6 @@ class TestDataset(unittest.TestCase):
""" """
Testcase for InMemoryDataset from create to run. Testcase for InMemoryDataset from create to run.
""" """
return
with open("test_in_memory_dataset_run_a.txt", "w") as f: with open("test_in_memory_dataset_run_a.txt", "w") as f:
data = "1 1 2 3 3 4 5 5 5 5 1 1\n" data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
data += "1 2 2 3 4 4 6 6 6 6 1 2\n" data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
...@@ -112,9 +109,10 @@ class TestDataset(unittest.TestCase): ...@@ -112,9 +109,10 @@ class TestDataset(unittest.TestCase):
for i in range(2): for i in range(2):
try: try:
exe.train_from_dataset(fluid.default_main_program(), dataset) exe.train_from_dataset(fluid.default_main_program(), dataset)
except: except ImportError as e:
#self.assertTrue(False)
pass pass
except Exception as e:
self.assertTrue(False)
os.remove("./test_in_memory_dataset_run_a.txt") os.remove("./test_in_memory_dataset_run_a.txt")
os.remove("./test_in_memory_dataset_run_b.txt") os.remove("./test_in_memory_dataset_run_b.txt")
...@@ -123,7 +121,6 @@ class TestDataset(unittest.TestCase): ...@@ -123,7 +121,6 @@ class TestDataset(unittest.TestCase):
""" """
Testcase for QueueDataset from create to run. Testcase for QueueDataset from create to run.
""" """
return
with open("test_queue_dataset_run_a.txt", "w") as f: with open("test_queue_dataset_run_a.txt", "w") as f:
data = "1 1 2 3 3 4 5 5 5 5 1 1\n" data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
data += "1 2 2 3 4 4 6 6 6 6 1 2\n" data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
...@@ -156,15 +153,14 @@ class TestDataset(unittest.TestCase): ...@@ -156,15 +153,14 @@ class TestDataset(unittest.TestCase):
for i in range(2): for i in range(2):
try: try:
exe.train_from_dataset(fluid.default_main_program(), dataset) exe.train_from_dataset(fluid.default_main_program(), dataset)
except: except ImportError as e:
#self.assertTrue(False)
pass pass
except Exception as e:
self.assertTrue(False)
os.remove("./test_queue_dataset_run_a.txt") os.remove("./test_queue_dataset_run_a.txt")
os.remove("./test_queue_dataset_run_b.txt") os.remove("./test_queue_dataset_run_b.txt")
if __name__ == '__main__': if __name__ == '__main__':
#unittest.main() unittest.main()
import sys
sys.exit(0)
...@@ -12,6 +12,9 @@ ...@@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .trainer_desc import MultiTrainer, DistMultiTrainer
from .device_worker import Hogwild, DownpourSGD
__all__ = ["TrainerFactory"] __all__ = ["TrainerFactory"]
...@@ -20,8 +23,6 @@ class TrainerFactory(object): ...@@ -20,8 +23,6 @@ class TrainerFactory(object):
pass pass
def _create_trainer(self, opt_info=None): def _create_trainer(self, opt_info=None):
from .trainer_desc import MultiTrainer, DistMultiTrainer
from .device_worker import Hogwild, DownpourSGD
trainer = None trainer = None
device_worker = None device_worker = None
if opt_info == None: if opt_info == None:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册