提交 24522fe0 编写于 作者: S ShawnXuan

consistent mode ok

上级 5dda431f
......@@ -119,7 +119,7 @@ class HybridValPipe(Pipeline):
return [output, self.cast(self.labels)]
def feed_ndarray(dali_tensor, arr):
def feed_ndarray(dali_tensor, arr, offset):
"""
Copy contents of DALI tensor to numpy's NDArray.
......@@ -131,11 +131,11 @@ def feed_ndarray(dali_tensor, arr):
Destination of the copy
"""
# Wait until arr is no longer used by the engine
assert dali_tensor.shape() == list(arr.shape), \
("Shapes do not match: DALI tensor has shape {0}"
", but NDArray has shape {1}".format(dali_tensor.shape(), list(arr.shape)))
#assert dali_tensor.shape() == list(arr.shape), \
# ("Shapes do not match: DALI tensor has shape {0}"
# ", but NDArray has shape {1}".format(dali_tensor.shape(), list(arr.shape)))
# Get CTypes void pointer to the underlying memory held by arr
c_type_pointer = ctypes.c_void_p(arr.ctypes.data)
c_type_pointer = ctypes.c_void_p(arr.ctypes.data + offset)
# Copy data from DALI tensor to ptr
dali_tensor.copy_to_external(c_type_pointer)
......@@ -208,13 +208,15 @@ class DALIGenericIterator(object):
self._last_batch_padded = last_batch_padded
self._auto_reset = auto_reset
self._squeeze_labels = squeeze_labels
assert dynamic_shape == False, "support fixed shape only."
self._dynamic_shape = dynamic_shape
# Build all pipelines
for p in self._pipes:
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
p.build()
# Use double-buffering of data batches
self._data_batches = [[None] for i in range(self._num_gpus)]
#self._data_batches = [[None] for i in range(self._num_gpus)]
self._data_batches = [None for i in range(2)]
self._counter = 0
self._current_data_batch = 0
self.output_map = output_map
......@@ -242,32 +244,35 @@ class DALIGenericIterator(object):
for p in self._pipes:
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
outputs.append(p.share_outputs())
for i in range(self._num_gpus):
for gpu_id in range(self._num_gpus):
# MXNet wants batches with clear distinction between
# data and label entries, so segregate outputs into
# 2 categories
# Change DALI TensorLists into Tensors
category_tensors = dict()
category_info = dict()
for j, out in enumerate(outputs[i]):
category_info = []
for j, out in enumerate(outputs[gpu_id]):
x = out.as_tensor()
category_tensors[self.output_map[j]] = x#.as_tensor()
if self._squeeze_labels and self.output_map[j]=='label':
category_tensors[self.output_map[j]].squeeze()
category_info[self.output_map[j]] = (x.shape(), np.dtype(x.dtype()))
category_info.append((x.shape(), np.dtype(x.dtype())))
# If we did not yet allocate memory for that batch, do it now
if self._data_batches[i][self._current_data_batch] is None:
if self._data_batches[self._current_data_batch] is None:
for category in self.output_map:
t = category_tensors[category]
assert type(t) is not TensorGPU, "CPU data only"#TODO
d = []
for (shape, dtype) in category_info.values():
self.category_nbytes = []
for j, (shape, dtype) in enumerate(category_info):
self.category_nbytes.append(np.zeros(shape, dtype = dtype).nbytes)
shape[0] = self._num_gpus * shape[0]
d.append(np.zeros(shape, dtype = dtype))
self._data_batches[i][self._current_data_batch] = d
self._data_batches[self._current_data_batch] = d
d = self._data_batches[i][self._current_data_batch]
d = self._data_batches[self._current_data_batch]
# Copy data from DALI Tensors to NDArrays
if self._dynamic_shape:
for j, (shape, dtype) in enumerate(category_info):
......@@ -275,7 +280,8 @@ class DALIGenericIterator(object):
d[j] = np.zeros(shape, dtype = dtype)
for j, d_arr in enumerate(d):
feed_ndarray(category_tensors[self.output_map[j]], d_arr)
offset = gpu_id * self.category_nbytes[j]
feed_ndarray(category_tensors[self.output_map[j]], d_arr, offset)
for p in self._pipes:
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
......@@ -303,12 +309,7 @@ class DALIGenericIterator(object):
# for db in self._data_batches:
# db[copy_db_index].pad = 0
#_data_batches[gpu_id][_current_data_batch][images, labels]
images = [db[copy_db_index][0] for db in self._data_batches]
labels = [db[copy_db_index][1] for db in self._data_batches]
#return images, labels
return np.concatenate(images), np.concatenate(labels)
#return [db[copy_db_index] for db in self._data_batches]
return self._data_batches[copy_db_index]
def next(self):
"""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册