提交 a1423b56 编写于 作者: wu-sheng's avatar wu-sheng

Finish graph APIs for streaming module.

上级 e0338baf
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.graph;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author peng-yongsheng, wu-sheng
*/
public final class Graph<Input> {
private int id;
private Node startNode;
private ConcurrentHashMap<Integer, Node> nodeIndex = new ConcurrentHashMap<>();
Graph(int id) {
this.id = id;
}
public void start(Input input) {
startNode.execute(input);
}
public <Output> Node<Input, Output> addNode(NodeHandler<Input, Output> nodeHandler) {
synchronized (this) {
startNode = new Node(this, nodeHandler);
return startNode;
}
}
public Next findNext(int handlerId) {
Node node = nodeIndex.get(handlerId);
if (node == null) {
throw new NodeNotFoundException("Can't find node with handlerId="
+ handlerId
+ " in graph[" + id + "】");
}
return node.getNext();
}
void checkForNewNode(Node node) {
int nodeId = node.getHandler().id();
if (nodeIndex.containsKey(nodeId)) {
throw new PotentialAcyclicGraphException("handler="
+ node.getHandler().getClass().getName()
+ " already exists in graph[" + id + "】");
}
nodeIndex.put(nodeId, node);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.graph;
import java.util.HashMap;
import java.util.Map;
/**
* @author wusheng
*/
public enum GraphManager {
INSTANCE;
private Map<Integer, Graph> allGraphs = new HashMap<>();
/**
* Create a stream process graph.
*
* @param graphId represents a graph, which is used for finding it.
* @return
*/
public synchronized <Input> Graph<Input> createIfAbsent(int graphId, Class<Input> input) {
if (!allGraphs.containsKey(graphId)) {
Graph graph = new Graph(graphId);
allGraphs.put(graphId, graph);
return graph;
} else {
return allGraphs.get(graphId);
}
}
public Graph findGraph(int graphId) {
Graph graph = allGraphs.get(graphId);
if (graph == null) {
throw new GraphNotFoundException("Graph id=" + graphId + " not found in this GraphManager");
}
return graph;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.graph;
/**
* @author wusheng
*/
public class GraphNotFoundException extends RuntimeException {
public GraphNotFoundException(String message) {
super(message);
}
}
......@@ -16,28 +16,34 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.graph;
package org.skywalking.apm.collector.core.graph;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.framework.Executor;
/**
* @author peng-yongsheng
* The <code>Next</code> is a delegate object for the following {@link Node}.
*
* @author peng-yongsheng, wu-sheng
*/
public class Next<Input extends Data> implements Executor<Input> {
public class Next<Input> implements Executor<Input> {
private final List<Node> nextNodes;
public Next() {
this.nextNodes = new ArrayList<>();
this.nextNodes = new LinkedList<>();
}
public final void addNext(Node node) {
nextNodes.add(node);
}
/**
* Drive to the next nodes
*
* @param input
*/
@Override public void execute(Input input) {
nextNodes.forEach(node -> node.execute(input));
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.graph;
/**
* The <code>Node</code> in the graph with explicit Input and Output types.
*
* @author peng-yongsheng, wu-sheng
*/
public final class Node<Input, Output> {
private final NodeHandler nodeHandler;
private final Next<Output> next;
private final Graph graph;
Node(Graph graph, NodeHandler<Input, Output> nodeHandler) {
this.graph = graph;
this.nodeHandler = nodeHandler;
this.next = new Next<>();
this.graph.checkForNewNode(this);
}
public final <NextOutput> Node<Output, NextOutput> addNext(NodeHandler<Output, NextOutput> nodeHandler) {
synchronized (graph) {
Node<Output, NextOutput> node = new Node<>(graph, nodeHandler);
next.addNext(node);
return node;
}
}
final void execute(Input input) {
nodeHandler.process(input, next);
}
NodeHandler getHandler() {
return nodeHandler;
}
Next<Output> getNext() {
return next;
}
}
......@@ -16,13 +16,18 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.graph;
import org.skywalking.apm.collector.core.data.Data;
package org.skywalking.apm.collector.core.graph;
/**
* @author peng-yongsheng
* @author peng-yongsheng, wu-sheng
*/
public interface Aggregator<Input extends Data, Output extends Data> {
public interface NodeHandler<Input, Output> {
/**
* The unique id in the certain graph.
*
* @return id
*/
int id();
void process(Input input, Next<Output> next);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.graph;
/**
* @author wusheng
*/
public class NodeNotFoundException extends RuntimeException {
public NodeNotFoundException(String message) {
super(message);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.graph;
/**
* @author wusheng
*/
public class PotentialAcyclicGraphException extends RuntimeException {
public PotentialAcyclicGraphException(String message) {
super(message);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.graph;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author wusheng
*/
public class GraphManagerTest {
private static PrintStream OUT_REF;
private ByteArrayOutputStream outputStream;
@Before
public void initAndHoldOut() {
OUT_REF = System.out;
outputStream = new ByteArrayOutputStream();
PrintStream testStream = new PrintStream(outputStream);
System.setOut(testStream);
}
@After
public void reset() {
System.setOut(OUT_REF);
}
@Test
public void testGraph() {
Graph<String> testGraph = GraphManager.INSTANCE.createIfAbsent(1, String.class);
Node<String, String> node = testGraph.addNode(new Node1Handler());
Node<String, Integer> node1 = node.addNext(new Node2Handler());
testGraph.start("Input String");
String output = outputStream.toString();
String expected = "Node1 process: s=Input String\n" +
"Node2 process: s=Input String\n";
Assert.assertEquals(expected, output);
}
@Test
public void testGraphWithChainStyle() {
Graph<String> graph = GraphManager.INSTANCE.createIfAbsent(2, String.class);
graph.addNode(new Node1Handler()).addNext(new Node2Handler()).addNext(new Node4Handler());
graph.start("Input String");
String output = outputStream.toString();
String expected = "Node1 process: s=Input String\n" +
"Node2 process: s=Input String\n" +
"Node4 process: int=123\n";
Assert.assertEquals(expected, output);
}
@Test(expected = PotentialAcyclicGraphException.class)
public void testPotentialAcyclicGraph() {
Graph<String> testGraph = GraphManager.INSTANCE.createIfAbsent(3, String.class);
Node<String, String> node = testGraph.addNode(new Node1Handler());
node.addNext(new Node1Handler());
}
@Test
public void testContinueStream() {
Graph<String> graph = GraphManager.INSTANCE.createIfAbsent(4, String.class);
graph.addNode(new Node1Handler()).addNext(new Node2Handler()).addNext(new Node4Handler());
Next next = GraphManager.INSTANCE.findGraph(4).findNext(2);
next.execute(123);
String output = outputStream.toString();
String expected =
"Node4 process: int=123\n";
Assert.assertEquals(expected, output);
}
@Test(expected = NodeNotFoundException.class)
public void handlerNotFound() {
Graph<String> graph = GraphManager.INSTANCE.createIfAbsent(5, String.class);
graph.addNode(new Node1Handler()).addNext(new Node2Handler()).addNext(new Node4Handler());
Next next = GraphManager.INSTANCE.findGraph(5).findNext(3);
}
}
......@@ -16,23 +16,18 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.graph;
import org.skywalking.apm.collector.core.data.Data;
package org.skywalking.apm.collector.core.graph;
/**
* @author peng-yongsheng
* @author wusheng
*/
public class StreamGraph {
private Node startNode;
public void start(Data input) {
startNode.execute(input);
public class Node1Handler implements NodeHandler<String, String> {
@Override public int id() {
return 1;
}
public Node addNode(Aggregator aggregator) {
startNode = new Node(aggregator);
return startNode;
@Override public void process(String s, Next<String> next) {
System.out.println("Node1 process: s=" + s);
next.execute(s);
}
}
......@@ -16,30 +16,18 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.graph;
import org.skywalking.apm.collector.core.data.Data;
package org.skywalking.apm.collector.core.graph;
/**
* @author peng-yongsheng
* @author wusheng
*/
public class Node<Input extends Data> {
private final Aggregator aggregator;
private final Next<Input> next;
public Node(Aggregator aggregator) {
this.aggregator = aggregator;
this.next = new Next<>();
}
public final Node addNext(Aggregator aggregator) {
Node node = new Node(aggregator);
next.addNext(node);
return node;
public class Node2Handler implements NodeHandler<String, Integer> {
@Override public int id() {
return 2;
}
final void execute(Input input) {
aggregator.process(input, next);
@Override public void process(String s, Next<Integer> next) {
System.out.println("Node2 process: s=" + s);
next.execute(123);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.graph;
/**
* @author wusheng
*/
public class Node3Handler implements NodeHandler<Long, Long> {
@Override public int id() {
return 3;
}
@Override
public void process(Long aLong, Next<Long> next) {
System.out.println("Node3 process: long=" + aLong);
next.execute(aLong);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.graph;
/**
* @author wusheng
*/
public class Node4Handler implements NodeHandler<Integer, Long> {
@Override public int id() {
return 4;
}
@Override
public void process(Integer in, Next<Long> next) {
System.out.println("Node4 process: int=" + in);
next.execute(new Long(in.intValue()));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册