# Copyright 2019 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """ Testing RandomCrop op in DE """ import numpy as np import mindspore.dataset.transforms.vision.c_transforms as c_vision import mindspore.dataset.transforms.vision.py_transforms as py_vision import mindspore.dataset.transforms.vision.utils as mode import mindspore.dataset as ds from mindspore import log as logger from util import save_and_check_md5, visualize GENERATE_GOLDEN = False DATA_DIR = ["../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"] SCHEMA_DIR = "../data/dataset/test_tf_file_3_images/datasetSchema.json" def test_random_crop_op(plot=False): """ Test RandomCrop Op """ logger.info("test_random_crop_op") # First dataset data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200]) decode_op = c_vision.Decode() data1 = data1.map(input_columns=["image"], operations=decode_op) data1 = data1.map(input_columns=["image"], operations=random_crop_op) # Second dataset data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) data2 = data2.map(input_columns=["image"], operations=decode_op) image_cropped = [] image = [] for item1, item2 in zip(data1.create_dict_iterator(), data2.create_dict_iterator()): image1 = item1["image"] image2 = item2["image"] image_cropped.append(image1) image.append(image2) if plot: visualize(image, image_cropped) def test_random_crop_01_c(): """ Test RandomCrop op with c_transforms: size is a single integer, expected to pass """ logger.info("test_random_crop_01_c") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: If size is an int, a square crop of size (size, size) is returned. random_crop_op = c_vision.RandomCrop(512) decode_op = c_vision.Decode() data = data.map(input_columns=["image"], operations=decode_op) data = data.map(input_columns=["image"], operations=random_crop_op) filename = "random_crop_01_c_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_01_py(): """ Test RandomCrop op with py_transforms: size is a single integer, expected to pass """ logger.info("test_random_crop_01_py") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: If size is an int, a square crop of size (size, size) is returned. transforms = [ py_vision.Decode(), py_vision.RandomCrop(512), py_vision.ToTensor() ] transform = py_vision.ComposeOp(transforms) data = data.map(input_columns=["image"], operations=transform()) filename = "random_crop_01_py_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_02_c(): """ Test RandomCrop op with c_transforms: size is a list/tuple with length 2, expected to pass """ logger.info("test_random_crop_02_c") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: If size is a sequence of length 2, it should be (height, width). random_crop_op = c_vision.RandomCrop([512, 375]) decode_op = c_vision.Decode() data = data.map(input_columns=["image"], operations=decode_op) data = data.map(input_columns=["image"], operations=random_crop_op) filename = "random_crop_02_c_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_02_py(): """ Test RandomCrop op with py_transforms: size is a list/tuple with length 2, expected to pass """ logger.info("test_random_crop_02_py") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: If size is a sequence of length 2, it should be (height, width). transforms = [ py_vision.Decode(), py_vision.RandomCrop([512, 375]), py_vision.ToTensor() ] transform = py_vision.ComposeOp(transforms) data = data.map(input_columns=["image"], operations=transform()) filename = "random_crop_02_py_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_03_c(): """ Test RandomCrop op with c_transforms: input image size == crop size, expected to pass """ logger.info("test_random_crop_03_c") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: The size of the image is 4032*2268 random_crop_op = c_vision.RandomCrop([2268, 4032]) decode_op = c_vision.Decode() data = data.map(input_columns=["image"], operations=decode_op) data = data.map(input_columns=["image"], operations=random_crop_op) filename = "random_crop_03_c_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_03_py(): """ Test RandomCrop op with py_transforms: input image size == crop size, expected to pass """ logger.info("test_random_crop_03_py") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: The size of the image is 4032*2268 transforms = [ py_vision.Decode(), py_vision.RandomCrop([2268, 4032]), py_vision.ToTensor() ] transform = py_vision.ComposeOp(transforms) data = data.map(input_columns=["image"], operations=transform()) filename = "random_crop_03_py_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_04_c(): """ Test RandomCrop op with c_transforms: input image size < crop size, expected to fail """ logger.info("test_random_crop_04_c") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) try: # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: The size of the image is 4032*2268 random_crop_op = c_vision.RandomCrop([2268, 4033]) decode_op = c_vision.Decode() data = data.map(input_columns=["image"], operations=decode_op) data = data.map(input_columns=["image"], operations=random_crop_op) image_list = [] for item in data.create_dict_iterator(): image = item["image"] image_list.append(image.shape) except BaseException as e: logger.info("Got an exception in DE: {}".format(str(e))) def test_random_crop_04_py(): """ Test RandomCrop op with py_transforms: input image size < crop size, expected to fail """ logger.info("test_random_crop_04_py") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) try: # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: The size of the image is 4032*2268 transforms = [ py_vision.Decode(), py_vision.RandomCrop([2268, 4033]), py_vision.ToTensor() ] transform = py_vision.ComposeOp(transforms) data = data.map(input_columns=["image"], operations=transform()) image_list = [] for item in data.create_dict_iterator(): image = (item["image"].transpose(1, 2, 0) * 255).astype(np.uint8) image_list.append(image.shape) except BaseException as e: logger.info("Got an exception in DE: {}".format(str(e))) def test_random_crop_05_c(): """ Test RandomCrop op with c_transforms: input image size < crop size but pad_if_needed is enabled, expected to pass """ logger.info("test_random_crop_05_c") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: The size of the image is 4032*2268 random_crop_op = c_vision.RandomCrop([2268, 4033], [200, 200, 200, 200], pad_if_needed=True) decode_op = c_vision.Decode() data = data.map(input_columns=["image"], operations=decode_op) data = data.map(input_columns=["image"], operations=random_crop_op) filename = "random_crop_05_c_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_05_py(): """ Test RandomCrop op with py_transforms: input image size < crop size but pad_if_needed is enabled, expected to pass """ logger.info("test_random_crop_05_py") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: The size of the image is 4032*2268 transforms = [ py_vision.Decode(), py_vision.RandomCrop([2268, 4033], [200, 200, 200, 200], pad_if_needed=True), py_vision.ToTensor() ] transform = py_vision.ComposeOp(transforms) data = data.map(input_columns=["image"], operations=transform()) filename = "random_crop_05_py_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_06_c(): """ Test RandomCrop op with c_transforms: invalid size, expected to raise TypeError """ logger.info("test_random_crop_06_c") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) try: # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: if size is neither an int nor a list of length 2, an exception will raise random_crop_op = c_vision.RandomCrop([512, 512, 375]) decode_op = c_vision.Decode() data = data.map(input_columns=["image"], operations=decode_op) data = data.map(input_columns=["image"], operations=random_crop_op) image_list = [] for item in data.create_dict_iterator(): image = item["image"] image_list.append(image.shape) except TypeError as e: logger.info("Got an exception in DE: {}".format(str(e))) assert "Size" in str(e) def test_random_crop_06_py(): """ Test RandomCrop op with py_transforms: invalid size, expected to raise TypeError """ logger.info("test_random_crop_06_py") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) try: # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: if size is neither an int nor a list of length 2, an exception will raise transforms = [ py_vision.Decode(), py_vision.RandomCrop([512, 512, 375]), py_vision.ToTensor() ] transform = py_vision.ComposeOp(transforms) data = data.map(input_columns=["image"], operations=transform()) image_list = [] for item in data.create_dict_iterator(): image = (item["image"].transpose(1, 2, 0) * 255).astype(np.uint8) image_list.append(image.shape) except TypeError as e: logger.info("Got an exception in DE: {}".format(str(e))) assert "Size" in str(e) def test_random_crop_07_c(): """ Test RandomCrop op with c_transforms: padding_mode is Border.CONSTANT and fill_value is 255 (White), expected to pass """ logger.info("test_random_crop_07_c") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: The padding_mode is default as Border.CONSTANT and set filling color to be white. random_crop_op = c_vision.RandomCrop(512, [200, 200, 200, 200], fill_value=(255, 255, 255)) decode_op = c_vision.Decode() data = data.map(input_columns=["image"], operations=decode_op) data = data.map(input_columns=["image"], operations=random_crop_op) filename = "random_crop_07_c_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_07_py(): """ Test RandomCrop op with py_transforms: padding_mode is Border.CONSTANT and fill_value is 255 (White), expected to pass """ logger.info("test_random_crop_07_py") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: The padding_mode is default as Border.CONSTANT and set filling color to be white. transforms = [ py_vision.Decode(), py_vision.RandomCrop(512, [200, 200, 200, 200], fill_value=(255, 255, 255)), py_vision.ToTensor() ] transform = py_vision.ComposeOp(transforms) data = data.map(input_columns=["image"], operations=transform()) filename = "random_crop_07_py_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_08_c(): """ Test RandomCrop op with c_transforms: padding_mode is Border.EDGE, expected to pass """ logger.info("test_random_crop_08_c") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: The padding_mode is Border.EDGE. random_crop_op = c_vision.RandomCrop(512, [200, 200, 200, 200], padding_mode=mode.Border.EDGE) decode_op = c_vision.Decode() data = data.map(input_columns=["image"], operations=decode_op) data = data.map(input_columns=["image"], operations=random_crop_op) filename = "random_crop_08_c_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_08_py(): """ Test RandomCrop op with py_transforms: padding_mode is Border.EDGE, expected to pass """ logger.info("test_random_crop_08_py") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) # Note: The padding_mode is Border.EDGE. transforms = [ py_vision.Decode(), py_vision.RandomCrop(512, [200, 200, 200, 200], padding_mode=mode.Border.EDGE), py_vision.ToTensor() ] transform = py_vision.ComposeOp(transforms) data = data.map(input_columns=["image"], operations=transform()) filename = "random_crop_08_py_result.npz" save_and_check_md5(data, filename, generate_golden=GENERATE_GOLDEN) def test_random_crop_09(): """ Test RandomCrop op: invalid type of input image (not PIL), expected to raise TypeError """ logger.info("test_random_crop_09") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) # Generate dataset data = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) transforms = [ py_vision.Decode(), py_vision.ToTensor(), # Note: if input is not PIL image, TypeError will raise py_vision.RandomCrop(512) ] transform = py_vision.ComposeOp(transforms) try: data = data.map(input_columns=["image"], operations=transform()) image_list = [] for item in data.create_dict_iterator(): image = item["image"] image_list.append(image.shape) except BaseException as e: logger.info("Got an exception in DE: {}".format(str(e))) assert "should be PIL Image" in str(e) def test_random_crop_comp(plot=False): """ Test RandomCrop and compare between python and c image augmentation """ logger.info("Test RandomCrop with c_transform and py_transform comparison") ds.config.set_seed(0) ds.config.set_num_parallel_workers(1) cropped_size = 512 # First dataset data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) random_crop_op = c_vision.RandomCrop(cropped_size) decode_op = c_vision.Decode() data1 = data1.map(input_columns=["image"], operations=decode_op) data1 = data1.map(input_columns=["image"], operations=random_crop_op) # Second dataset data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) transforms = [ py_vision.Decode(), py_vision.RandomCrop(cropped_size), py_vision.ToTensor() ] transform = py_vision.ComposeOp(transforms) data2 = data2.map(input_columns=["image"], operations=transform()) image_c_cropped = [] image_py_cropped = [] for item1, item2 in zip(data1.create_dict_iterator(), data2.create_dict_iterator()): c_image = item1["image"] py_image = (item2["image"].transpose(1, 2, 0) * 255).astype(np.uint8) image_c_cropped.append(c_image) image_py_cropped.append(py_image) if plot: visualize(image_c_cropped, image_py_cropped) if __name__ == "__main__": test_random_crop_01_c() test_random_crop_02_c() test_random_crop_03_c() test_random_crop_04_c() test_random_crop_05_c() test_random_crop_06_c() test_random_crop_07_c() test_random_crop_08_c() test_random_crop_01_py() test_random_crop_02_py() test_random_crop_03_py() test_random_crop_04_py() test_random_crop_05_py() test_random_crop_06_py() test_random_crop_07_py() test_random_crop_08_py() test_random_crop_09() test_random_crop_op(True) test_random_crop_comp(True)