test_transformer.py 2.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
import os
import time
import unittest
import sys
import logging
import numpy as np

import set_env
from data import build_source
from data import transform as tf

logger = logging.getLogger(__name__)

logging.basicConfig(level=logging.INFO)


class TestTransformer(unittest.TestCase):
    """Test cases for dataset.transform.transformer
    """

    @classmethod
    def setUpClass(cls):
        """ setup
        """

        prefix = os.path.dirname(os.path.abspath(__file__))
        # json data
        anno_path = set_env.coco_data['TRAIN']['ANNO_FILE']
        image_dir = set_env.coco_data['TRAIN']['IMAGE_DIR']
        cls.sc_config = {
            'anno_file': anno_path,
            'image_dir': image_dir,
            'samples': 200
        }

        cls.ops = [{
            'op': 'DecodeImage',
            'to_rgb': True
        }, {
            'op': 'ResizeImage',
            'target_size': 800,
            'max_size': 1333
        }, {
            'op': 'ArrangeRCNN',
            'is_mask': False
        }]

    @classmethod
    def tearDownClass(cls):
        """ tearDownClass """
        pass

    def test_map(self):
        """ test transformer.map
        """
        mapper = tf.build(self.ops)
        ds = build_source(self.sc_config)
        mapped_ds = tf.map(ds, mapper)
        ct = 0
        for sample in mapped_ds:
            self.assertTrue(type(sample[0]) is np.ndarray)
            ct += 1

        self.assertEqual(ct, mapped_ds.size())

    def test_parallel_map(self):
        """ test transformer.map with concurrent workers
        """
        mapper = tf.build(self.ops)
        ds = build_source(self.sc_config)
        worker_conf = {'WORKER_NUM': 2, 'use_process': True}
        mapped_ds = tf.map(ds, mapper, worker_conf)

        ct = 0
        for sample in mapped_ds:
            self.assertTrue(type(sample[0]) is np.ndarray)
            ct += 1

        self.assertTrue(mapped_ds.drained())
        self.assertEqual(ct, mapped_ds.size())
        mapped_ds.reset()

        ct = 0
        for sample in mapped_ds:
            self.assertTrue(type(sample[0]) is np.ndarray)
            ct += 1

        self.assertEqual(ct, mapped_ds.size())

    def test_batch(self):
        """ test batched dataset
        """
        batchsize = 2
        mapper = tf.build(self.ops)
        ds = build_source(self.sc_config)
        mapped_ds = tf.map(ds, mapper)
        batched_ds = tf.batch(mapped_ds, batchsize, True)
        for sample in batched_ds:
            out = sample
        self.assertEqual(len(out), batchsize)


if __name__ == '__main__':
    unittest.main()