diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc
index e4e9861e37a4334220d5e39a5b44afafd668b7c3..b5f7e6c22405d6928f0e423458d6cd720f2d09a8 100644
--- a/paddle/fluid/framework/data_feed.cc
+++ b/paddle/fluid/framework/data_feed.cc
@@ -242,6 +242,11 @@ void InMemoryDataFeed<T>::SetTrainerNum(int trainer_num) {
   trainer_num_ = trainer_num;
 }
 
+template <typename T>
+void InMemoryDataFeed<T>::SetFleetSendBatchSize(int64_t size) {
+  fleet_send_batch_size_ = size;
+}
+
 template <typename T>
 void InMemoryDataFeed<T>::PutInsToChannel(const std::string& ins_str) {
 #ifdef _LINUX
@@ -361,8 +366,13 @@ void InMemoryDataFeed<T>::GlobalShuffle() {
   VLOG(3) << "GlobalShuffle() begin, thread_id=" << thread_id_;
   auto fleet_ptr = FleetWrapper::GetInstance();
   std::vector<std::vector<T*>> send_vec(trainer_num_);
+  std::vector<int> send_index(trainer_num_);
+  uint64_t reserve_len = fleet_send_batch_size_ / trainer_num_;
   for (auto& vec : send_vec) {
-    vec.reserve(fleet_send_batch_size_);
+    vec.reserve(reserve_len);
+  }
+  for (int i = 0; i < trainer_num_; ++i) {
+    send_index[i] = i;
   }
   std::vector<std::future<int32_t>> total_status;
   auto interval = GetMemoryDataInterval();
@@ -375,7 +385,10 @@ void InMemoryDataFeed<T>::GlobalShuffle() {
     int64_t node_id = random_num % trainer_num_;
     send_vec[node_id].push_back(&((*memory_data_)[i]));
     if (i % fleet_send_batch_size_ == 0 && i != 0) {
-      for (int j = 0; j < send_vec.size(); ++j) {
+      // shuffle the sequence of sending to avoid network timeout error
+      std::random_shuffle(send_index.begin(), send_index.end());
+      for (int index = 0; index < send_index.size(); ++index) {
+        int j = send_index[index];
         std::string send_str;
         SerializeIns(send_vec[j], &send_str);
         VLOG(3) << "send str_length=" << send_str.length()
@@ -388,7 +401,10 @@ void InMemoryDataFeed<T>::GlobalShuffle() {
       }
     }
   }
-  for (int j = 0; j < send_vec.size(); ++j) {
+  // shuffle the sequence of sending to avoid network timeout error
+  std::random_shuffle(send_index.begin(), send_index.end());
+  for (int index = 0; index < send_index.size(); ++index) {
+    int j = send_index[index];
     if (send_vec[j].size() != 0) {
       std::string send_str;
       SerializeIns(send_vec[j], &send_str);
diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h
index 8ea09b65ddd569e8ca8e24ba3b2416666d0eec92..648c874a0b8763b18118e18adf3b3e93acfd104b 100644
--- a/paddle/fluid/framework/data_feed.h
+++ b/paddle/fluid/framework/data_feed.h
@@ -94,6 +94,8 @@ class DataFeed {
   virtual void SetThreadNum(int thread_num) {}
   // This function will do nothing at default
   virtual void SetTrainerNum(int trainer_num) {}
+  // This function will do nothing at default
+  virtual void SetFleetSendBatchSize(int64_t size) {}
   virtual void SetFileListMutex(std::mutex* mutex) {
     mutex_for_pick_file_ = mutex;
   }
@@ -212,6 +214,7 @@ class InMemoryDataFeed : public PrivateQueueDataFeed<T> {
   virtual void SetThreadId(int thread_id);
   virtual void SetThreadNum(int thread_num);
   virtual void SetTrainerNum(int trainer_num);
+  virtual void SetFleetSendBatchSize(int64_t size);
   virtual void PutInsToChannel(const std::string& ins_str);
   virtual void FillMemoryDataToChannel();
   virtual void FillChannelToMemoryData();
diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc
index 600fc74710023c340a7b43053a38e1d82a11c976..a3b7b1e454ecec9da766b9b156c31b1317bb9d35 100644
--- a/paddle/fluid/framework/data_set.cc
+++ b/paddle/fluid/framework/data_set.cc
@@ -64,6 +64,17 @@ void DatasetImpl<T>::SetTrainerNum(int trainer_num) {
   }
 }
 
+// if you run distributed, and want to do global shuffle,
+// set this before global shuffle.
+// be sure you call CreateReaders before SetFleetSendBatchSize
+template <typename T>
+void DatasetImpl<T>::SetFleetSendBatchSize(int64_t size) {
+  fleet_send_batch_size_ = size;
+  for (auto reader : readers_) {
+    reader->SetFleetSendBatchSize(size);
+  }
+}
+
 template <typename T>
 void DatasetImpl<T>::SetHdfsConfig(const std::string& fs_name,
                                    const std::string& fs_ugi) {
diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h
index 6fd3fcad28fa045326032200b7f26a18862454f4..bbe0f937abfa635b126062059abfcfb70adb996e 100644
--- a/paddle/fluid/framework/data_set.h
+++ b/paddle/fluid/framework/data_set.h
@@ -47,6 +47,8 @@ class Dataset {
   virtual void SetThreadNum(int thread_num) = 0;
   // set workers' num
   virtual void SetTrainerNum(int trainer_num) = 0;
+  // set fleet send batch size
+  virtual void SetFleetSendBatchSize(int64_t size) = 0;
   // set fs name and ugi
   virtual void SetHdfsConfig(const std::string& fs_name,
                              const std::string& fs_ugi) = 0;
@@ -59,6 +61,8 @@ class Dataset {
   virtual int GetThreadNum() = 0;
   // get worker num
   virtual int GetTrainerNum() = 0;
+  // get fleet send batch size
+  virtual int64_t GetFleetSendBatchSize() = 0;
   // get hdfs config
   virtual std::pair<std::string, std::string> GetHdfsConfig() = 0;
   // get data fedd desc
@@ -98,6 +102,7 @@ class DatasetImpl : public Dataset {
   virtual void SetFileList(const std::vector<std::string>& filelist);
   virtual void SetThreadNum(int thread_num);
   virtual void SetTrainerNum(int trainer_num);
+  virtual void SetFleetSendBatchSize(int64_t size);
   virtual void SetHdfsConfig(const std::string& fs_name,
                              const std::string& fs_ugi);
   virtual void SetDataFeedDesc(const std::string& data_feed_desc_str);
@@ -105,6 +110,7 @@ class DatasetImpl : public Dataset {
   virtual const std::vector<std::string>& GetFileList() { return filelist_; }
   virtual int GetThreadNum() { return thread_num_; }
   virtual int GetTrainerNum() { return trainer_num_; }
+  virtual int64_t GetFleetSendBatchSize() { return fleet_send_batch_size_; }
   virtual std::pair<std::string, std::string> GetHdfsConfig() {
     return std::make_pair(fs_name_, fs_ugi_);
   }
@@ -137,6 +143,7 @@ class DatasetImpl : public Dataset {
   std::string fs_name_;
   std::string fs_ugi_;
   unsigned int rand_seed;
+  int64_t fleet_send_batch_size_;
 };
 
 // use std::vector<MultiSlotType> as data type
diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc
index 8147c7746192a91bb82c2aa754c5664def4c142f..394ff24c466622956b18b3012c146f6f9ddd838e 100644
--- a/paddle/fluid/framework/fleet/fleet_wrapper.cc
+++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc
@@ -237,6 +237,7 @@ void FleetWrapper::PushDenseParamSync(
   std::vector<paddle::ps::Region> regions;
   for (auto& t : var_names) {
     Variable* var = scope.FindVar(t);
+    CHECK(var != nullptr) << "var[" << t << "] not found";
     LoDTensor* tensor = var->GetMutable<LoDTensor>();
     float* g = tensor->mutable_data<float>(place);
     paddle::ps::Region reg(g, tensor->numel());
diff --git a/paddle/fluid/framework/io/shell.cc b/paddle/fluid/framework/io/shell.cc
index bcfa4f44ff1c6561cbbd60b76f75de1c8461a88a..ab671cb5690df51c1cff141906c40cc9e74584fa 100644
--- a/paddle/fluid/framework/io/shell.cc
+++ b/paddle/fluid/framework/io/shell.cc
@@ -126,7 +126,7 @@ static int shell_popen_fork_internal(const char* real_cmd, bool do_read,
   }
 
   close_open_fds_internal();
-  if (execl("/bin/sh", "sh", "-c", real_cmd, NULL) < 0) {
+  if (execl("/bin/bash", "bash", "-c", real_cmd, NULL) < 0) {
     return -1;
   }
   exit(127);
diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc
index b773fd03c003e4c5b51f4876e6ac999f9e830ce4..3f171b65ab83de5a0d84d3c29b1e82510bf69716 100644
--- a/paddle/fluid/pybind/data_set_py.cc
+++ b/paddle/fluid/pybind/data_set_py.cc
@@ -50,11 +50,15 @@ void BindDataset(py::module* m) {
       .def("set_filelist", &framework::Dataset::SetFileList)
       .def("set_thread_num", &framework::Dataset::SetThreadNum)
       .def("set_trainer_num", &framework::Dataset::SetTrainerNum)
+      .def("set_fleet_send_batch_size",
+           &framework::Dataset::SetFleetSendBatchSize)
       .def("set_hdfs_config", &framework::Dataset::SetHdfsConfig)
       .def("set_data_feed_desc", &framework::Dataset::SetDataFeedDesc)
       .def("get_filelist", &framework::Dataset::GetFileList)
       .def("get_thread_num", &framework::Dataset::GetThreadNum)
       .def("get_trainer_num", &framework::Dataset::GetTrainerNum)
+      .def("get_fleet_send_batch_size",
+           &framework::Dataset::GetFleetSendBatchSize)
       .def("get_hdfs_config", &framework::Dataset::GetHdfsConfig)
       .def("get_data_feed_desc", &framework::Dataset::GetDataFeedDesc)
       .def("register_client2client_msg_handler",
diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py
index d63773223ddc0c155f26a656f19c4ba80f482632..e655fd4a976a8a6fa2811ddc43de3d1f231229d5 100644
--- a/python/paddle/fluid/dataset.py
+++ b/python/paddle/fluid/dataset.py
@@ -241,11 +241,13 @@ class InMemoryDataset(DatasetBase):
             fleet: fleet singleton. Default None.
         """
         trainer_num = 1
+        fleet_send_batch_size = 80000
         if fleet is not None:
             fleet.fleet_instance.role_maker_._barrier_worker()
             trainer_num = fleet.worker_num()
         self.dataset.register_client2client_msg_handler()
         self.dataset.set_trainer_num(trainer_num)
+        self.dataset.set_fleet_send_batch_size(fleet_send_batch_size)
         if fleet is not None:
             fleet.fleet_instance.role_maker_._barrier_worker()
         self.dataset.global_shuffle()
diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py
index e4666deb7fabe3628856269b6c665aacec1e9ee4..e15197037e1d901855883919b02a1574b7bc9a29 100644
--- a/python/paddle/fluid/executor.py
+++ b/python/paddle/fluid/executor.py
@@ -712,7 +712,7 @@ class Executor(object):
         if dataset == None:
             raise RuntimeError("dataset is needed and should be initialized")
 
-        if self.place == paddle.fluid.CUDAPlace():
+        if not isinstance(self.place, core.CPUPlace):
             raise RuntimeError("infer_from_dataset is verified on CPUPlace"
                                "We will open CUDAPlace in the future")
 
@@ -796,7 +796,7 @@ class Executor(object):
         if dataset == None:
             raise RuntimeError("dataset is need and should be initialized")
 
-        if self.place == paddle.fluid.CUDAPlace():
+        if not isinstance(self.place, core.CPUPlace):
             raise RuntimeError("train_from_dataset is verified on CPUPlace"
                                "We will open CUDAPlace in the future")
 
diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py
index 044aa33c2b5b572aa40169e8c57936b105ba0121..9b1ec412c731a4b59d0da8847e91e30d8e1d864a 100644
--- a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py
+++ b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py
@@ -123,18 +123,25 @@ class Fleet(object):
             print("You should run DistributedOptimizer.minimize() first")
             sys.exit(-1)
 
-    def init_worker(self, programs):
+    def init_worker(self, programs, scopes=None):
         """
         init_worker(): will be called by user. When a user knows current process is_server(), he/she
                     should call init_worker() to initialize global information about worker and connect
-                    worker with pserver.
+                    worker with pserver. You should run startup program before init_worker.
 
         Args:
             programs(Program|list): a Program or a list of Programs
-
+            scopes(Scope|list): a Scope or  a list of Scopes, default None.
         """
         if not isinstance(programs, list):
             programs = [programs]
+        if scopes is None:
+            scopes = [fluid.global_scope()] * len(programs)
+        if len(scopes) != len(programs):
+            print(
+                "You should make sure len(scopes) == len(programs) or set scopes None"
+            )
+            sys.exit(-1)
         if self._opt_info:
             if "fleet_desc" in self._opt_info:
                 self._dist_desc_str = text_format.MessageToString(
@@ -160,7 +167,7 @@ class Fleet(object):
             self.role_maker_._barrier_worker()
             if self.role_maker_._is_first_worker():
                 tables = self._dist_desc.trainer_param.dense_table
-                for prog in programs:
+                for prog, scope in zip(programs, scopes):
                     prog_id = str(id(prog))
                     prog_conf = self._opt_info['program_configs'][prog_id]
                     prog_tables = {}
@@ -174,10 +181,16 @@ class Fleet(object):
                             continue
                         var_name_list = []
                         for i in range(0, len(table.dense_variable_name)):
-                            var_name_list.append(table.dense_variable_name[i])
-                    self._fleet_ptr.init_model(prog.desc,
-                                               int(table.table_id),
-                                               var_name_list)
+                            var_name = table.dense_variable_name[i]
+                            if scope.find_var(var_name) is None:
+                                print("var " + var_name +
+                                      " not found in scope, " +
+                                      "you should run startup program first")
+                                sys.exit(-1)
+                            var_name_list.append(var_name)
+                        self._fleet_ptr.init_model(scope,
+                                                   int(table.table_id),
+                                                   var_name_list)
             # barrier for init model done
             self.role_maker_._barrier_worker()
         else:
diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py
index 8c705a095c768c861aac07249467cf75bb289b2d..4cfd99150562438d9ca64a2b0db215915e682d34 100644
--- a/python/paddle/fluid/tests/unittests/test_dataset.py
+++ b/python/paddle/fluid/tests/unittests/test_dataset.py
@@ -29,7 +29,6 @@ class TestDataset(unittest.TestCase):
 
     def test_dataset_create(self):
         """ Testcase for dataset create. """
-        return
         try:
             dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
         except:
@@ -48,7 +47,6 @@ class TestDataset(unittest.TestCase):
 
     def test_dataset_config(self):
         """ Testcase for dataset configuration. """
-        return
         dataset = fluid.core.Dataset("MultiSlotDataset")
         dataset.set_thread_num(12)
         dataset.set_filelist(["a.txt", "b.txt", "c.txt"])
@@ -75,7 +73,6 @@ class TestDataset(unittest.TestCase):
         """
         Testcase for InMemoryDataset from create to run.
         """
-        return
         with open("test_in_memory_dataset_run_a.txt", "w") as f:
             data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
             data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
@@ -112,9 +109,10 @@ class TestDataset(unittest.TestCase):
         for i in range(2):
             try:
                 exe.train_from_dataset(fluid.default_main_program(), dataset)
-            except:
-                #self.assertTrue(False)
+            except ImportError as e:
                 pass
+            except Exception as e:
+                self.assertTrue(False)
 
         os.remove("./test_in_memory_dataset_run_a.txt")
         os.remove("./test_in_memory_dataset_run_b.txt")
@@ -123,7 +121,6 @@ class TestDataset(unittest.TestCase):
         """
         Testcase for QueueDataset from create to run.
         """
-        return
         with open("test_queue_dataset_run_a.txt", "w") as f:
             data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
             data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
@@ -156,15 +153,14 @@ class TestDataset(unittest.TestCase):
         for i in range(2):
             try:
                 exe.train_from_dataset(fluid.default_main_program(), dataset)
-            except:
-                #self.assertTrue(False)
+            except ImportError as e:
                 pass
+            except Exception as e:
+                self.assertTrue(False)
 
         os.remove("./test_queue_dataset_run_a.txt")
         os.remove("./test_queue_dataset_run_b.txt")
 
 
 if __name__ == '__main__':
-    #unittest.main()
-    import sys
-    sys.exit(0)
+    unittest.main()
diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py
index 4e957880f77a41d3dad9582bc7cc09af1d1a253b..871b663663e87a08ef3edaf58a4480b85caf4c4a 100644
--- a/python/paddle/fluid/trainer_factory.py
+++ b/python/paddle/fluid/trainer_factory.py
@@ -12,6 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from .trainer_desc import MultiTrainer, DistMultiTrainer
+from .device_worker import Hogwild, DownpourSGD
+
 __all__ = ["TrainerFactory"]
 
 
@@ -20,8 +23,6 @@ class TrainerFactory(object):
         pass
 
     def _create_trainer(self, opt_info=None):
-        from .trainer_desc import MultiTrainer, DistMultiTrainer
-        from .device_worker import Hogwild, DownpourSGD
         trainer = None
         device_worker = None
         if opt_info == None: