提交 80642bee 编写于 作者: W wanghaoshuang

fix_xmap and refine flowers dataset

上级 633082ad
......@@ -25,8 +25,9 @@ import uci_housing
import sentiment
import wmt14
import mq2007
import flowers
__all__ = [
'mnist', 'imikolov', 'imdb', 'cifar', 'movielens', 'conll05', 'sentiment'
'uci_housing', 'wmt14', 'mq2007'
'uci_housing', 'wmt14', 'mq2007', 'flowers'
]
......@@ -34,9 +34,9 @@ from common import download
import tarfile
import scipy.io as scio
from paddle.v2.image import *
from paddle.v2.reader import *
import os
import numpy as np
import paddle.v2 as paddle
from multiprocessing import cpu_count
__all__ = ['train', 'test', 'valid']
......@@ -53,8 +53,8 @@ def default_mapper(sample):
map image bytes data to type needed by model input layer
'''
img, label = sample
img = paddle.image.load_image_bytes(img)
img = paddle.image.simple_transform(img, 256, 224, True)
img = load_image_bytes(img)
img = simple_transform(img, 256, 224, True)
return img.flatten().astype('float32'), label
......@@ -63,7 +63,8 @@ def reader_creator(data_file,
setid_file,
dataset_name,
mapper=default_mapper,
buffered_size=1024):
buffered_size=1024,
useXmap=True):
'''
1. read images from tar file and
merge images into batch files in 102flowers.tgz_batch/
......@@ -105,11 +106,13 @@ def reader_creator(data_file,
for sample, label in itertools.izip(data, batch['label']):
yield sample, int(label)
return paddle.reader.xmap_readers(mapper, reader,
cpu_count(), buffered_size)
if useXmap:
return xmap_readers(mapper, reader, cpu_count(), buffered_size)
else:
return map_readers(mapper, reader)
def train(mapper=default_mapper, buffered_size=1024):
def train(mapper=default_mapper, buffered_size=1024, useXmap=True):
'''
Create flowers training set reader.
It returns a reader, each sample in the reader is
......@@ -128,11 +131,11 @@ def train(mapper=default_mapper, buffered_size=1024):
return reader_creator(
download(DATA_URL, 'flowers', DATA_MD5),
download(LABEL_URL, 'flowers', LABEL_MD5),
download(SETID_URL, 'flowers', SETID_MD5), 'trnid', mapper,
buffered_size)
download(SETID_URL, 'flowers', SETID_MD5), 'tstid', mapper,
buffered_size, useXmap)
def test(mapper=default_mapper, buffered_size=1024):
def test(mapper=default_mapper, buffered_size=1024, useXmap=True):
'''
Create flowers test set reader.
It returns a reader, each sample in the reader is
......@@ -151,11 +154,11 @@ def test(mapper=default_mapper, buffered_size=1024):
return reader_creator(
download(DATA_URL, 'flowers', DATA_MD5),
download(LABEL_URL, 'flowers', LABEL_MD5),
download(SETID_URL, 'flowers', SETID_MD5), 'tstid', mapper,
buffered_size)
download(SETID_URL, 'flowers', SETID_MD5), 'trnid', mapper,
buffered_size, useXmap)
def valid(mapper=default_mapper, buffered_size=1024):
def valid(mapper=default_mapper, buffered_size=1024, useXmap=True):
'''
Create flowers validation set reader.
It returns a reader, each sample in the reader is
......@@ -175,7 +178,7 @@ def valid(mapper=default_mapper, buffered_size=1024):
download(DATA_URL, 'flowers', DATA_MD5),
download(LABEL_URL, 'flowers', LABEL_MD5),
download(SETID_URL, 'flowers', SETID_MD5), 'valid', mapper,
buffered_size)
buffered_size, useXmap)
def fetch():
......
......@@ -31,13 +31,13 @@ class TestFlowers(unittest.TestCase):
def test_train(self):
instances, max_label_value = self.check_reader(
paddle.v2.dataset.flowers.train())
self.assertEqual(instances, 1020)
self.assertEqual(instances, 6149)
self.assertEqual(max_label_value, 102)
def test_test(self):
instances, max_label_value = self.check_reader(
paddle.v2.dataset.flowers.test())
self.assertEqual(instances, 6149)
self.assertEqual(instances, 1020)
self.assertEqual(max_label_value, 102)
def test_valid(self):
......
......@@ -248,9 +248,6 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False):
:rtype: callable
"""
end = XmapEndSignal()
in_queue = Queue(buffer_size)
out_queue = Queue(buffer_size)
out_order = [0]
# define a worker to read samples from reader to in_queue
def read_worker(reader, in_queue):
......@@ -266,12 +263,6 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False):
in_order += 1
in_queue.put(end)
# start a read worker in a thread
target = order_read_worker if order else read_worker
t = Thread(target=target, args=(reader, in_queue))
t.daemon = True
t.start()
# define a worker to handle samples from in_queue by mapper
# and put mapped samples into out_queue
def handle_worker(in_queue, out_queue, mapper):
......@@ -298,6 +289,15 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False):
in_queue.put(end)
out_queue.put(end)
def xreader():
in_queue = Queue(buffer_size)
out_queue = Queue(buffer_size)
out_order = [0]
# start a read worker in a thread
target = order_read_worker if order else read_worker
t = Thread(target=target, args=(reader, in_queue))
t.daemon = True
t.start()
# start several handle_workers
target = order_handle_worker if order else handle_worker
args = (in_queue, out_queue, mapper, out_order) if order else (
......@@ -310,7 +310,6 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False):
for w in workers:
w.start()
def xreader():
sample = out_queue.get()
while not isinstance(sample, XmapEndSignal):
yield sample
......
......@@ -132,10 +132,12 @@ class TestXmap(unittest.TestCase):
for order in orders:
for tNum in thread_nums:
for size in buffered_size:
result = []
for i in paddle.v2.reader.xmap_readers(mapper,
reader = paddle.v2.reader.xmap_readers(mapper,
reader_creator_10(0),
tNum, size, order)():
tNum, size, order)
for n in xrange(3):
result = []
for i in reader():
result.append(i)
if not order:
result.sort()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册