提交 45dbfdde 编写于 作者: M mjsax 提交者: mbalassi

[storm-compat] Added tests for Storm compatibility wrappers

上级 0a8b63aa
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.wrappers;
import static org.mockito.Mockito.mock;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.junit.Assert;
import org.junit.Test;
public class FlinkDummyRichFunctionTest {
@Test
public void testRuntimeContext() {
final FlinkDummyRichFunction dummy = new FlinkDummyRichFunction();
final RuntimeContext context = mock(RuntimeContext.class);
dummy.setRuntimeContext(context);
Assert.assertSame(context, dummy.getRuntimeContext());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.wrappers;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
public class StormBoltWrapperTest {
@SuppressWarnings("unused")
@Test(expected = IllegalArgumentException.class)
public void testWrapperRawType() throws Exception {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields("dummy1", "dummy2"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), true);
}
@SuppressWarnings("unused")
@Test(expected = IllegalArgumentException.class)
public void testWrapperToManyAttributes1() throws Exception {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
final String[] schema = new String[26];
for(int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
}
@SuppressWarnings("unused")
@Test(expected = IllegalArgumentException.class)
public void testWrapperToManyAttributes2() throws Exception {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
final String[] schema = new String[26];
for(int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), false);
}
@Test
public void testWrapper() throws Exception {
for(int i = 0; i < 26; ++i) {
this.testWrapper(i);
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
private void testWrapper(final int numberOfAttributes) throws Exception {
assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
Tuple flinkTuple = null;
String rawTuple = null;
if(numberOfAttributes == 0) {
rawTuple = new String("test");
} else {
flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
}
String[] schema = new String[numberOfAttributes];
if(numberOfAttributes == 0) {
schema = new String[1];
}
for(int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
final StreamRecord record = mock(StreamRecord.class);
if(numberOfAttributes == 0) {
when(record.getObject()).thenReturn(rawTuple);
} else {
when(record.getObject()).thenReturn(flinkTuple);
}
final StreamRecordSerializer serializer = mock(StreamRecordSerializer.class);
when(serializer.createInstance()).thenReturn(record);
final IndexedReaderIterator reader = mock(IndexedReaderIterator.class);
when(reader.next(record)).thenReturn(record).thenReturn(null);
final StreamTaskContext taskContext = mock(StreamTaskContext.class);
when(taskContext.getInputSerializer(0)).thenReturn(serializer);
when(taskContext.getIndexedInput(0)).thenReturn(reader);
final IRichBolt bolt = mock(IRichBolt.class);
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
final StormBoltWrapper wrapper = new StormBoltWrapper(bolt);
wrapper.setup(taskContext);
wrapper.callUserFunction();
if(numberOfAttributes == 0) {
verify(bolt).execute(eq(new StormTuple<String>(rawTuple)));
} else {
verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple)));
}
wrapper.run();
if(numberOfAttributes == 0) {
verify(bolt, times(2)).execute(eq(new StormTuple<String>(rawTuple)));
} else {
verify(bolt, times(2)).execute(eq(new StormTuple<Tuple>(flinkTuple)));
}
}
@Test
public void testOpen() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
wrapper.open(mock(Configuration.class));
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
}
@Test
public void testOpenSink() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
wrapper.open(mock(Configuration.class));
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
}
@SuppressWarnings("unchecked")
@Test
public void testClose() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
final StreamTaskContext<Object> taskContext = mock(StreamTaskContext.class);
when(taskContext.getOutputCollector()).thenReturn(mock(Collector.class));
wrapper.setup(taskContext);
wrapper.close();
verify(bolt).cleanup();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.wrappers;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import backtype.storm.tuple.Values;
public class StormCollectorTest extends AbstractTest {
@Test
public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
for(int i = 0; i < 26; ++i) {
this.testStromCollector(true, i);
}
}
@Test
public void testBoltStormCollector() throws InstantiationException, IllegalAccessException {
for(int i = 0; i < 26; ++i) {
this.testStromCollector(false, i);
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void testStromCollector(final boolean spoutTest, final int numberOfAttributes)
throws InstantiationException, IllegalAccessException {
assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
final Collector flinkCollector = mock(Collector.class);
Tuple flinkTuple = null;
final Values tuple = new Values();
StormCollector<?> collector = null;
if(numberOfAttributes == 0) {
collector = new StormCollector(numberOfAttributes, flinkCollector);
tuple.add(new Integer(this.r.nextInt()));
} else {
collector = new StormCollector(numberOfAttributes, flinkCollector);
flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
for(int i = 0; i < numberOfAttributes; ++i) {
tuple.add(new Integer(this.r.nextInt()));
flinkTuple.setField(tuple.get(i), i);
}
}
final String streamId = "streamId";
final Collection anchors = mock(Collection.class);
final List<Integer> taskIds;
final Object messageId = new Integer(this.r.nextInt());
if(spoutTest) {
taskIds = collector.emit(streamId, tuple, messageId);
} else {
taskIds = collector.emit(streamId, anchors, tuple);
}
Assert.assertNull(taskIds);
if(numberOfAttributes == 0) {
verify(flinkCollector).collect(tuple.get(0));
} else {
verify(flinkCollector).collect(flinkTuple);
}
}
@Test(expected = UnsupportedOperationException.class)
public void testReportError() {
new StormCollector<Object>(1, null).reportError(null);
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Test(expected = UnsupportedOperationException.class)
public void testBoltEmitDirect() {
new StormCollector<Object>(1, null).emitDirect(0, (String)null, (Collection)null, (List)null);
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Test(expected = UnsupportedOperationException.class)
public void testSpoutEmitDirect() {
new StormCollector<Object>(1, null).emitDirect(0, (String)null, (List)null, (Object)null);
}
@Test(expected = UnsupportedOperationException.class)
public void testAck() {
new StormCollector<Object>(1, null).ack(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testFail() {
new StormCollector<Object>(1, null).fail(null);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.wrappers;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.LinkedList;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
@RunWith(PowerMockRunner.class)
@PrepareForTest(StormWrapperSetupHelper.class)
public class StormFiniteSpoutWrapperTest extends AbstractTest {
@Test
public void testRunExecuteFixedNumber() throws Exception {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
final IRichSpout spout = mock(IRichSpout.class);
final int numberOfCalls = this.r.nextInt(50);
final StormFiniteSpoutWrapper<?> spoutWrapper = new StormFiniteSpoutWrapper<Object>(spout, numberOfCalls);
spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
spoutWrapper.run(null);
verify(spout, times(numberOfCalls)).nextTuple();
}
@Test
public void testRunExecute() throws Exception {
final int numberOfCalls = this.r.nextInt(50);
final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
for(int i = numberOfCalls - 1; i >= 0; --i) {
expectedResult.add(new Tuple1<Integer>(new Integer(i)));
}
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
spout);
spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
final TestCollector collector = new TestCollector();
spoutWrapper.run(collector);
Assert.assertEquals(expectedResult, collector.result);
}
@Test
public void testCancel() throws Exception {
final int numberOfCalls = 5 + this.r.nextInt(5);
final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
expectedResult.add(new Tuple1<Integer>(new Integer(numberOfCalls - 1)));
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
spout);
spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
spoutWrapper.cancel();
final TestCollector collector = new TestCollector();
spoutWrapper.run(collector);
Assert.assertEquals(expectedResult, collector.result);
}
@Test
public void testClose() throws Exception {
final IRichSpout spout = mock(IRichSpout.class);
final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
spout);
spoutWrapper.close();
verify(spout).close();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.wrappers;
import java.util.ArrayList;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.junit.Assert;
import org.junit.Test;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
public class StormOutputFieldsDeclarerTest extends AbstractTest {
@Test
public void testDeclare() {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
Assert.assertEquals(-1, declarer.getNumberOfAttributes());
final int numberOfAttributes = 1 + this.r.nextInt(25);
final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
for(int i = 0; i < numberOfAttributes; ++i) {
schema.add("a" + i);
}
declarer.declare(new Fields(schema));
Assert.assertEquals(numberOfAttributes, declarer.getNumberOfAttributes());
}
public void testDeclareDirect() {
new StormOutputFieldsDeclarer().declare(false, null);
}
@Test(expected = UnsupportedOperationException.class)
public void testDeclareDirectFail() {
new StormOutputFieldsDeclarer().declare(true, null);
}
public void testDeclareStream() {
new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, null);
}
@Test(expected = UnsupportedOperationException.class)
public void testDeclareStreamFail() {
new StormOutputFieldsDeclarer().declareStream(null, null);
}
public void testDeclareFullStream() {
new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, null);
}
@Test(expected = UnsupportedOperationException.class)
public void testDeclareFullStreamFailNonDefaultStream() {
new StormOutputFieldsDeclarer().declareStream(null, false, null);
}
@Test(expected = UnsupportedOperationException.class)
public void testDeclareFullStreamFailDirect() {
new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.wrappers;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.util.LinkedList;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import backtype.storm.topology.IRichSpout;
@RunWith(PowerMockRunner.class)
@PrepareForTest(StormWrapperSetupHelper.class)
public class StormSpoutWrapperTest extends AbstractTest {
@Test
public void testRunExecuteCancelInfinite() throws Exception {
final int numberOfCalls = 5 + this.r.nextInt(5);
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
spoutWrapper.cancel();
final TestCollector collector = new TestCollector();
spoutWrapper.run(collector);
Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);
}
@Test
public void testClose() throws Exception {
final IRichSpout spout = mock(IRichSpout.class);
final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
spoutWrapper.close();
verify(spout).close();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.wrappers;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.junit.Assert;
import org.junit.Test;
public class StormTupleTest extends AbstractTest {
@Test
public void nonTupleTest() {
final Object flinkTuple = new Integer(this.r.nextInt());
final StormTuple<Object> tuple = new StormTuple<Object>(flinkTuple);
Assert.assertSame(flinkTuple, tuple.getValue(0));
final List<Object> values = tuple.getValues();
Assert.assertEquals(1, values.size());
Assert.assertEquals(flinkTuple, values.get(0));
}
@Test
public void tupleTest() throws InstantiationException, IllegalAccessException {
final int numberOfAttributes = 1 + this.r.nextInt(25);
final Object[] data = new Object[numberOfAttributes];
final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
for(int i = 0; i < numberOfAttributes; ++i) {
data[i] = new Integer(this.r.nextInt());
flinkTuple.setField(data[i], i);
}
final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
final List<Object> values = tuple.getValues();
Assert.assertEquals(numberOfAttributes, values.size());
for(int i = 0; i < numberOfAttributes; ++i) {
Assert.assertEquals(flinkTuple.getField(i), values.get(i));
}
Assert.assertEquals(numberOfAttributes, tuple.size());
}
@Test
public void testBinary() {
final byte[] data = new byte[this.r.nextInt(15)];
this.r.nextBytes(data);
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
}
@Test
public void testBoolean() {
final Boolean flinkTuple = new Boolean(this.r.nextBoolean());
final StormTuple<Boolean> tuple = new StormTuple<Boolean>(flinkTuple);
Assert.assertEquals(flinkTuple, tuple.getBoolean(0));
}
@Test
public void testByte() {
final Byte flinkTuple = new Byte((byte)this.r.nextInt());
final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple);
Assert.assertEquals(flinkTuple, tuple.getByte(0));
}
@Test
public void testDouble() {
final Double flinkTuple = new Double(this.r.nextDouble());
final StormTuple<Double> tuple = new StormTuple<Double>(flinkTuple);
Assert.assertEquals(flinkTuple, tuple.getDouble(0));
}
@Test
public void testFloat() {
final Float flinkTuple = new Float(this.r.nextFloat());
final StormTuple<Float> tuple = new StormTuple<Float>(flinkTuple);
Assert.assertEquals(flinkTuple, tuple.getFloat(0));
}
@Test
public void testInteger() {
final Integer flinkTuple = new Integer(this.r.nextInt());
final StormTuple<Integer> tuple = new StormTuple<Integer>(flinkTuple);
Assert.assertEquals(flinkTuple, tuple.getInteger(0));
}
@Test
public void testLong() {
final Long flinkTuple = new Long(this.r.nextInt());
final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple);
Assert.assertEquals(flinkTuple, tuple.getLong(0));
}
@Test
public void testShort() {
final Short flinkTuple = new Short((short)this.r.nextInt());
final StormTuple<Short> tuple = new StormTuple<Short>(flinkTuple);
Assert.assertEquals(flinkTuple, tuple.getShort(0));
}
@Test
public void testString() {
final byte[] data = new byte[this.r.nextInt(15)];
this.r.nextBytes(data);
final String flinkTuple = new String(data);
final StormTuple<String> tuple = new StormTuple<String>(flinkTuple);
Assert.assertEquals(flinkTuple, tuple.getString(0));
}
@Test
public void testBinaryTuple() {
final byte[] data = new byte[this.r.nextInt(15)];
this.r.nextBytes(data);
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
}
@Test
public void testBooleanTuple() {
final Boolean data = new Boolean(this.r.nextBoolean());
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
Assert.assertEquals(flinkTuple.getField(index), tuple.getBoolean(index));
}
@Test
public void testByteTuple() {
final Byte data = new Byte((byte)this.r.nextInt());
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
Assert.assertEquals(flinkTuple.getField(index), tuple.getByte(index));
}
@Test
public void testDoubleTuple() {
final Double data = new Double(this.r.nextDouble());
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
Assert.assertEquals(flinkTuple.getField(index), tuple.getDouble(index));
}
@Test
public void testFloatTuple() {
final Float data = new Float(this.r.nextFloat());
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
Assert.assertEquals(flinkTuple.getField(index), tuple.getFloat(index));
}
@Test
public void testIntegerTuple() {
final Integer data = new Integer(this.r.nextInt());
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
Assert.assertEquals(flinkTuple.getField(index), tuple.getInteger(index));
}
@Test
public void testLongTuple() {
final Long data = new Long(this.r.nextInt());
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
Assert.assertEquals(flinkTuple.getField(index), tuple.getLong(index));
}
@Test
public void testShortTuple() {
final Short data = new Short((short)this.r.nextInt());
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
Assert.assertEquals(flinkTuple.getField(index), tuple.getShort(index));
}
@Test
public void testStringTuple() {
final byte[] rawdata = new byte[this.r.nextInt(15)];
this.r.nextBytes(rawdata);
final String data = new String(rawdata);
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
Assert.assertEquals(flinkTuple.getField(index), tuple.getString(index));
}
@Test(expected = UnsupportedOperationException.class)
public void testContains() {
new StormTuple<Object>(null).contains(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetFields() {
new StormTuple<Object>(null).getFields();
}
@Test(expected = UnsupportedOperationException.class)
public void testFieldIndex() {
new StormTuple<Object>(null).fieldIndex(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testSelect() {
new StormTuple<Object>(null).select(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetValueByField() {
new StormTuple<Object>(null).getValueByField(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetStringByField() {
new StormTuple<Object>(null).getStringByField(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetIntegerByField() {
new StormTuple<Object>(null).getIntegerByField(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetLongByField() {
new StormTuple<Object>(null).getLongByField(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetBooleanByField() {
new StormTuple<Object>(null).getBooleanByField(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetShortByField() {
new StormTuple<Object>(null).getShortByField(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetByteByField() {
new StormTuple<Object>(null).getByteByField(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetDoubleByField() {
new StormTuple<Object>(null).getDoubleByField(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetFloatByField() {
new StormTuple<Object>(null).getFloatByField(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetBinaryByField() {
new StormTuple<Object>(null).getBinaryByField(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceGlobalStreamid() {
new StormTuple<Object>(null).getSourceGlobalStreamid();
}
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceComponent() {
new StormTuple<Object>(null).getSourceComponent();
}
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceTask() {
new StormTuple<Object>(null).getSourceTask();
}
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceStreamId() {
new StormTuple<Object>(null).getSourceStreamId();
}
@Test(expected = UnsupportedOperationException.class)
public void testGetMessageId() {
new StormTuple<Object>(null).getMessageId();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.wrappers;
import static org.mockito.Mockito.mock;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import backtype.storm.topology.IComponent;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
@RunWith(PowerMockRunner.class)
@PrepareForTest(StormWrapperSetupHelper.class)
public class StormWrapperSetupHelperTest extends AbstractTest {
@Test(expected = IllegalArgumentException.class)
public void testZeroAttributesDeclarerBolt() throws Exception {
IComponent boltOrSpout;
if(this.r.nextBoolean()) {
boltOrSpout = mock(IRichSpout.class);
} else {
boltOrSpout = mock(IRichBolt.class);
}
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields());
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, this.r.nextBoolean());
}
@Test
public void testEmptyDeclarerBolt() {
IComponent boltOrSpout;
if(this.r.nextBoolean()) {
boltOrSpout = mock(IRichSpout.class);
} else {
boltOrSpout = mock(IRichBolt.class);
}
Assert.assertEquals(-1, StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, this.r.nextBoolean()));
}
@Test(expected = IllegalArgumentException.class)
public void testRawType() throws Exception {
IComponent boltOrSpout;
if(this.r.nextBoolean()) {
boltOrSpout = mock(IRichSpout.class);
} else {
boltOrSpout = mock(IRichBolt.class);
}
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields("dummy1", "dummy2"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, true);
}
@Test(expected = IllegalArgumentException.class)
public void testToManyAttributes() throws Exception {
IComponent boltOrSpout;
if(this.r.nextBoolean()) {
boltOrSpout = mock(IRichSpout.class);
} else {
boltOrSpout = mock(IRichBolt.class);
}
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
final String[] schema = new String[26];
for(int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, false);
}
@Test
public void testTupleTypes() throws Exception {
for(int i = 0; i < 26; ++i) {
this.testTupleTypes(i);
}
}
private void testTupleTypes(final int numberOfAttributes) throws Exception {
String[] schema = new String[numberOfAttributes];
if(numberOfAttributes == 0) {
schema = new String[1];
}
for(int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
IComponent boltOrSpout;
if(this.r.nextBoolean()) {
boltOrSpout = mock(IRichSpout.class);
} else {
boltOrSpout = mock(IRichBolt.class);
}
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, numberOfAttributes == 0);
}
// @Test
// public void testConvertToTopologyContext() {
// Assert.fail();
// }
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册