提交 d266bac9 编写于 作者: X xuezhong

remove test temporal

test=develop
上级 eb7bc3e7
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
from op_test import OpTest
class Sampler(object):
def __init__(self, range, seed):
self.range_ = range
self.seed_ = seed
np.random.seed(self.seed_)
def sample(self):
rasie("No Implementation!")
def probability(self, value):
raise ("No Implementation!")
class LogUniformSampler(Sampler):
def __init__(self, range, seed):
super(LogUniformSampler, self).__init__(range, seed)
self.log_range_ = np.log(self.range_ + 1)
def sample(self):
value = int(np.exp(np.random.uniform(0.0, self.log_range_)) - 1)
return value % self.range_
def probability(self, value):
return np.log((value + 2.0) / (value + 1.0)) / self.log_range_
def adjust_prob(prob, num_samples, num_tries):
if num_samples == num_tries:
return prob * num_samples
else:
return -np.expm1(num_tries * np.log1p(-prob))
def take_along_axis1(array, index):
out = np.zeros_like(index, dtype=array.dtype)
n_row, n_col = index.shape
for i in range(n_row):
for j in range(n_col):
out[i, j] = array[i, index[i, j]]
return out
def sample_prob(sampler, num_samples, labels):
batch_size, num_true = labels.shape
num_sampled_classes = num_samples + num_true
samples = np.zeros((batch_size, num_sampled_classes), dtype=np.int64)
probabilities = np.zeros(
(batch_size, num_sampled_classes), dtype=np.float64)
tmp_samples = set()
num_tries = 0
j = 0
while j < num_true:
for i in range(batch_size):
samples[i, j] = labels[i, j]
probabilities[i, j] = sampler.probability(labels[i, j])
j += 1
while j < num_sampled_classes:
v = sampler.sample()
num_tries += 1
if v not in tmp_samples:
tmp_samples.add(v)
for i in range(batch_size):
samples[i, j] = v
probabilities[i, j] = sampler.probability(v)
j += 1
for k in range(num_sampled_classes):
for i in range(batch_size):
probabilities[i, k] = adjust_prob(probabilities[i, k], num_samples,
num_tries)
return (samples, probabilities)
def compute_remove_accidental_hits(sampled_logits, samples, num_true):
batch_size, num_sampled_classes = samples.shape
for i in range(batch_size):
true_labels = set(samples[i, np.arange(num_true)])
for j in range(num_true, num_sampled_classes):
if samples[i, j] in true_labels:
sampled_logits[i, j] -= 1e20
def sample_logits(logits,
labels,
num_samples,
seed,
remove_accidental_hits,
use_customized_samples,
customized_samples=None,
customized_probabilities=None):
batch_size, num_classes = logits.shape
num_true = labels.shape[1]
num_sampled_classes = num_true + num_samples
if use_customized_samples:
samples = customized_samples
probabilities = customized_probabilities
else:
sampler = LogUniformSampler(num_classes, seed)
samples, probabilities = sample_prob(sampler, num_samples, labels)
sampled_logits = take_along_axis1(logits, samples)
if remove_accidental_hits:
compute_remove_accidental_hits(sampled_logits, samples, num_true)
sampled_logits -= np.log(probabilities)
sampled_labels = np.tile(np.arange(num_true), (batch_size, 1))
return (sampled_logits, samples, sampled_labels, probabilities)
class TestSampleLogitsOp(OpTest):
'''
Test SampleLogitsOp, but with random results precomputed
in python and just test the non-random part.
'''
def generate_data(self, logits, labels, num_samples, seed,
remove_accidental_hits, use_customized_samples,
customized_samples, customized_probabilities):
self.attrs = {
'num_samples': num_samples,
'use_customized_samples': use_customized_samples,
'remove_accidental_hits': remove_accidental_hits,
'seed': seed
}
self.inputs = {
'Logits': logits,
'Labels': labels,
'CustomizedSamples': customized_samples,
'CustomizedProbabilities': customized_probabilities
}
def set_data(self, batch_size, num_classes, num_true, num_samples, seed,
remove_accidental_hits):
logits = np.random.randn(batch_size, num_classes)
labels = np.stack([
np.random.choice(
range(0, num_classes), num_true, replace=False)
for _ in range(batch_size)
])
sampler = LogUniformSampler(num_classes, seed)
customized_samples, customized_probabilities = \
sample_prob(sampler, num_samples, labels)
use_customized_samples = True
remove_accidental_hits = remove_accidental_hits
self.generate_data(logits, labels, num_samples, seed,
remove_accidental_hits, use_customized_samples,
customized_samples, customized_probabilities)
def compute(self):
out = sample_logits(self.inputs["Logits"], self.inputs["Labels"],
self.attrs["num_samples"], self.attrs["seed"],
self.attrs["remove_accidental_hits"],
self.attrs["use_customized_samples"],
self.inputs["CustomizedSamples"],
self.inputs["CustomizedProbabilities"])
self.outputs = {
'SampledLogits': out[0],
'Samples': out[1],
'SampledLabels': out[2],
'Probabilities': out[3]
}
def setUp(self):
self.op_type = 'sample_logits'
batch_size = 5
num_classes = 20
num_true = 5
num_samples = 10
seed = 10
remove_accidental_hits = True
self.set_data(batch_size, num_classes, num_true, num_samples, seed,
remove_accidental_hits)
self.compute()
def test_check_output(self):
self.check_output()
def test_check_grad(self):
pass
self.check_grad(
["Logits"], ["SampledLogits", "Samples"], max_relative_error=0.02)
class TestSampleLogitsOp2(TestSampleLogitsOp):
def setUp(self):
self.op_type = 'sample_logits'
batch_size = 5
num_classes = 20
num_true = 5
num_samples = 10
seed = 10
remove_accidental_hits = False
self.set_data(batch_size, num_classes, num_true, num_samples, seed,
remove_accidental_hits)
self.compute()
class TestSampleLogitsOp3(TestSampleLogitsOp):
def setUp(self):
self.op_type = 'sample_logits'
batch_size = 5
num_classes = 100
num_true = 5
num_samples = 25
seed = 10
remove_accidental_hits = True
self.set_data(batch_size, num_classes, num_true, num_samples, seed,
remove_accidental_hits)
self.compute()
class TestSampleLogitsOp4(TestSampleLogitsOp):
def setUp(self):
self.op_type = 'sample_logits'
batch_size = 5
num_classes = 100
num_true = 5
num_samples = 25
seed = 10
remove_accidental_hits = False
self.set_data(batch_size, num_classes, num_true, num_samples, seed,
remove_accidental_hits)
self.compute()
class TestSampleLogitsOpV2(OpTest):
'''
Test SampleLogitsOp, but with random results precomputed
in C++ and copied to python and just test the non-random part.
'''
def generate_data(self, logits, labels, num_samples, seed,
remove_accidental_hits, use_customized_samples):
self.attrs = {
'num_samples': num_samples,
'use_customized_samples': use_customized_samples,
'remove_accidental_hits': remove_accidental_hits,
'seed': seed
}
self.inputs = {'Logits': logits, 'Labels': labels.astype(np.int64)}
def set_data(self, num_classes, num_samples, seed, remove_accidental_hits):
labels = np.array([[6, 12, 15, 5, 1], [0, 9, 4, 1, 10],
[0, 2, 10, 16, 13], [14, 4, 7, 2, 1],
[3, 18, 11, 8, 14]])
batch_size, num_true = labels.shape
use_customized_samples = False
num_sampled_classes = num_samples + num_true
logits = np.random.randn(batch_size, num_classes)
remove_accidental_hits = remove_accidental_hits
self.generate_data(logits, labels, num_samples, seed,
remove_accidental_hits, use_customized_samples)
# python and c++ use different random generator
# use fetched samples from c++ for python code
self.fetched_samples = np.array(
[[6, 12, 15, 5, 1, 5, 15, 1, 0, 8, 3, 14, 2, 13, 4],
[0, 9, 4, 1, 10, 5, 15, 1, 0, 8, 3, 14, 2, 13, 4],
[0, 2, 10, 16, 13, 5, 15, 1, 0, 8, 3, 14, 2, 13, 4],
[14, 4, 7, 2, 1, 5, 15, 1, 0, 8, 3, 14, 2, 13, 4],
[3, 18, 11, 8, 14, 5, 15, 1, 0, 8, 3, 14, 2, 13, 4]])
fectched_num_tries = 21
probabilities = np.zeros(
(batch_size, num_sampled_classes), dtype=np.float64)
sampler = LogUniformSampler(num_classes, seed)
for j in range(num_sampled_classes):
for i in range(batch_size):
probabilities[i, j] = sampler.probability(self.fetched_samples[
i, j])
probabilities[i, j] = adjust_prob(
probabilities[i, j], num_samples, fectched_num_tries)
self.probabilities = probabilities
def compute(self):
out = sample_logits(self.inputs["Logits"], self.inputs["Labels"],
self.attrs["num_samples"], self.attrs["seed"],
self.attrs["remove_accidental_hits"], True,
self.fetched_samples.astype(np.int64),
self.probabilities)
self.outputs = {
'SampledLogits': out[0],
'Samples': out[1],
'SampledLabels': out[2],
'Probabilities': out[3]
}
def setUp(self):
self.op_type = 'sample_logits'
num_samples = 10
num_classes = 20
seed = 10
remove_accidental_hits = True
self.set_data(num_classes, num_samples, seed, remove_accidental_hits)
self.compute()
def test_check_output(self):
self.check_output()
def test_check_grad(self):
pass
self.check_grad(
["Logits"], ["SampledLogits", "Samples"], max_relative_error=0.02)
class TestSampleLogitsOpV3(OpTest):
'''
Test SampleLogitsOp, but with random results precomputed
in C++ and copied to python and just test the non-random part.
'''
def generate_data(self, logits, labels, num_samples, seed,
remove_accidental_hits, use_customized_samples):
self.attrs = {
'num_samples': num_samples,
'use_customized_samples': use_customized_samples,
'remove_accidental_hits': remove_accidental_hits,
'seed': seed
}
self.inputs = {'Logits': logits, 'Labels': labels.astype(np.int64)}
def set_data(self, num_classes, num_samples, seed, remove_accidental_hits):
labels = [52, 2, 2, 17, 96, 2, 17, 96, 37, 2]
samples = [
3, 12, 74, 28, 1, 79, 2, 42, 8, 13, 0, 18, 88, 49, 14, 46, 39, 57,
26, 75, 9, 50, 16, 66, 6, 23, 5, 11, 17, 54, 35, 20, 53, 10, 47, 80,
38, 7, 4, 31, 15, 19, 58, 22, 34, 41, 73, 62, 95, 25, 70, 37, 30,
65, 27, 51, 43, 32, 99, 21, 56, 29, 40, 69, 55, 98, 77, 67, 33, 89,
63, 81, 59, 48, 91, 68, 72, 61, 52, 86
]
self.fetched_samples = np.array([[x] + samples for x in labels])
fectched_num_tries = 323
labels = self.fetched_samples[:, 0:1]
batch_size, num_true = labels.shape
use_customized_samples = False
num_sampled_classes = num_samples + num_true
logits = np.random.randn(batch_size, num_classes)
remove_accidental_hits = remove_accidental_hits
self.generate_data(logits, labels, num_samples, seed,
remove_accidental_hits, use_customized_samples)
# python and c++ use different random generator
# use fetched samples from c++ for python code
probabilities = np.zeros(
(batch_size, num_sampled_classes), dtype=np.float64)
sampler = LogUniformSampler(num_classes, seed)
for j in range(num_sampled_classes):
for i in range(batch_size):
probabilities[i, j] = sampler.probability(self.fetched_samples[
i, j])
probabilities[i, j] = adjust_prob(
probabilities[i, j], num_samples, fectched_num_tries)
self.probabilities = probabilities
def compute(self):
out = sample_logits(self.inputs["Logits"], self.inputs["Labels"],
self.attrs["num_samples"], self.attrs["seed"],
self.attrs["remove_accidental_hits"], True,
self.fetched_samples.astype(np.int64),
self.probabilities)
self.outputs = {
'SampledLogits': out[0],
'Samples': out[1],
'SampledLabels': out[2],
'Probabilities': out[3]
}
def setUp(self):
self.op_type = 'sample_logits'
num_samples = 80
num_classes = 100
seed = 123
remove_accidental_hits = True
self.set_data(num_classes, num_samples, seed, remove_accidental_hits)
self.compute()
def test_check_output(self):
self.check_output()
def test_check_grad(self):
pass
self.check_grad(
["Logits"], ["SampledLogits", "Samples"], max_relative_error=0.02)
if __name__ == '__main__':
unittest.main()
......@@ -156,26 +156,8 @@ def append_input_output(block, op_proto, np_list, is_input, dtype):
return var_dict
def var_cast(block, input):
if input.dtype == core.VarDesc.VarType.FP32 or input.dtype == core.VarDesc.VarType.FP32:
return input
out = block.create_var(dtype="float32", shape=[1])
op = block.append_op(
inputs={"X": input},
outputs={"Out": out},
type='cast',
attrs={
'out_dtype': core.VarDesc.VarType.FP32,
'in_dtype': input.dtype
})
op.desc.infer_var_type(block.desc)
op.desc.infer_shape(block.desc)
return out
def append_loss_ops(block, output_names):
mean_inputs = list(map(block.var, output_names))
mean_inputs = [var_cast(block, x) for x in mean_inputs]
if len(mean_inputs) == 1:
loss = block.create_var(dtype=mean_inputs[0].dtype, shape=[1])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册