diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml index 590f33da986e8fbd106d13719b4ba5c3e8bc19ea..0ac49dbde5fa08354e2f8af5ec88da99fe7e526b 100644 --- a/flink-contrib/flink-storm/pom.xml +++ b/flink-contrib/flink-storm/pom.xml @@ -61,18 +61,40 @@ under the License. log4j-over-slf4j - logback-classic ch.qos.logback + logback-classic + + + ring + ring-core + + + ring + ring-devel + + + ring + ring-servlet + + + ring + ring-jetty-adapter + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + org.jgrapht + jgrapht-core - - com.google.guava - guava - ${guava.version} - - @@ -85,51 +107,4 @@ under the License. - - - - org.apache.maven.plugins - maven-jar-plugin - - - - test-jar - - - - - - - - - - - org.eclipse.m2e - lifecycle-mapping - 1.0.0 - - - - - - org.apache.maven.plugins - maven-dependency-plugin - [2.9,) - - unpack - - - - - - - - - - - - - - - diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java index 9628bb7e7fd047fc1c0396aae010193cd219c7e5..f4bcfb711c5f8d4505c2f8f532dedbd6f473d446 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java @@ -22,6 +22,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.pattern.Patterns; import akka.util.Timeout; + import backtype.storm.Config; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; @@ -32,7 +33,6 @@ import backtype.storm.utils.NimbusClient; import backtype.storm.utils.Utils; import com.esotericsoftware.kryo.Serializer; -import com.google.common.collect.Lists; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; @@ -52,6 +52,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus; import org.apache.flink.storm.util.StormConfig; import org.apache.flink.streaming.api.graph.StreamGraph; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -215,7 +216,7 @@ public class FlinkClient { try { ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader( - Lists.newArrayList(uploadedJarUrl), + Collections.singletonList(uploadedJarUrl), Collections.emptyList(), this.getClass().getClassLoader()); client.runDetached(jobGraph, classLoader); diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java index 5311cb3e3ce949ace9069ee1b3ca38995b64cae4..6e316e7e75044f002079837a4c0401cc2520633a 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.wrappers; import backtype.storm.generated.GlobalStreamId; @@ -26,8 +27,6 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.MessageId; import backtype.storm.utils.Utils; -import com.google.common.collect.Sets; - import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple0; @@ -44,6 +43,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import static java.util.Arrays.asList; + /** * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming program. * It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the bolt can @@ -135,9 +136,9 @@ public class BoltWrapper extends AbstractStreamOperator implements * {@code rawOuput} is {@code false} and the number of declared output attributes is not within range * [1;25]. */ - public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs) + public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs) throws IllegalArgumentException { - this(bolt, null, Sets.newHashSet(rawOutputs)); + this(bolt, null, asList(rawOutputs)); } /** @@ -157,8 +158,7 @@ public class BoltWrapper extends AbstractStreamOperator implements * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range * [1;25]. */ - public BoltWrapper(final IRichBolt bolt, final Collection rawOutputs) - throws IllegalArgumentException { + public BoltWrapper(final IRichBolt bolt, final Collection rawOutputs) throws IllegalArgumentException { this(bolt, null, rawOutputs); } @@ -181,9 +181,12 @@ public class BoltWrapper extends AbstractStreamOperator implements * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range * [0;25]. */ - public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, final String[] rawOutputs) + public BoltWrapper( + final IRichBolt bolt, + final Fields inputSchema, + final String[] rawOutputs) throws IllegalArgumentException { - this(bolt, inputSchema, Sets.newHashSet(rawOutputs)); + this(bolt, inputSchema, asList(rawOutputs)); } /** diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java index db1d147921a2fcfff605b9d01c8d5a8ea7b9ca19..52d39a7e762f721aaca77e9543686333f005e972 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java @@ -120,9 +120,8 @@ final class FlinkTopologyContext extends TopologyContext { * @throws UnsupportedOperationException * at every invocation */ - @SuppressWarnings("unchecked") @Override - public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) { + public T registerMetric(final String name, final T metric, final int timeBucketSizeInSecs) { throw new UnsupportedOperationException("Metrics are not supported by Flink"); } diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java index 89defdee29c5766dceea0d29b85b80dbd1778fc3..7a3b6d5b541837188b22daaada2693eda66b99da 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java @@ -14,10 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.wrappers; import backtype.storm.topology.IRichBolt; -import com.google.common.collect.Sets; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple1; @@ -26,6 +26,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.Collection; +import static java.util.Arrays.asList; + /** * A {@link MergedInputsBoltWrapper} is a {@link BoltWrapper} that expects input tuples of type {@link StormTuple}. It * can be used to wrap a multi-input bolt and assumes that all input stream got merged into a {@link StormTuple} stream @@ -67,7 +69,7 @@ public final class MergedInputsBoltWrapper extends BoltWrapper extends RichParallelSourceFunction imp */ public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs) throws IllegalArgumentException { - this(spout, Sets.newHashSet(rawOutputs), null); + this(spout, asList(rawOutputs), null); } /** @@ -147,7 +147,7 @@ public final class SpoutWrapper extends RichParallelSourceFunction imp */ public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs, final Integer numberOfInvocations) throws IllegalArgumentException { - this(spout, Sets.newHashSet(rawOutputs), numberOfInvocations); + this(spout, asList(rawOutputs), numberOfInvocations); } /** diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java index 82b12d6fafec81420df6bfea24101dcd7b694118..000fe848e8863fa8c4377ce0cf12d10da168ccfd 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.wrappers; import backtype.storm.Config; @@ -27,16 +28,18 @@ import backtype.storm.topology.IRichSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; -import com.google.common.collect.Sets; + import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.util.AbstractTest; import org.apache.flink.storm.util.TestDummyBolt; import org.apache.flink.storm.util.TestDummySpout; import org.apache.flink.storm.util.TestSink; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; + import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -48,6 +51,8 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import static java.util.Collections.singleton; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -85,7 +90,7 @@ public class WrapperSetupHelperTest extends AbstractTest { PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); WrapperSetupHelper.getNumberOfAttributes(boltOrSpout, - Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID })); + new HashSet(singleton(Utils.DEFAULT_STREAM_ID))); } @Test(expected = IllegalArgumentException.class) @@ -143,8 +148,7 @@ public class WrapperSetupHelperTest extends AbstractTest { Assert.assertEquals(attributes, WrapperSetupHelper.getNumberOfAttributes( boltOrSpout, - numberOfAttributes == -1 ? Sets - .newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null)); + numberOfAttributes == -1 ? new HashSet(singleton(Utils.DEFAULT_STREAM_ID)) : null)); } @Test