提交 406c4f52 编写于 作者: wu-sheng's avatar wu-sheng

Add concept: WayToNode to define how to ask node to process.

上级 d44b7981
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.graph;
/**
* @author wusheng
*/
public class DirectWay<INPUT, OUTPUT> extends WayToNode<INPUT, OUTPUT> {
public DirectWay(NodeProcessor<INPUT, OUTPUT> destinationHandler) {
super(destinationHandler);
}
@Override protected void in(INPUT o) {
out(o);
}
}
......@@ -29,14 +29,14 @@ import org.skywalking.apm.collector.core.framework.Executor;
*/
public class Next<INPUT> implements Executor<INPUT> {
private final List<Node> nextNodes;
private final List<WayToNode> ways;
public Next() {
this.nextNodes = new LinkedList<>();
this.ways = new LinkedList<>();
}
public final void addNext(Node node) {
nextNodes.add(node);
final void addWay(WayToNode way) {
ways.add(way);
}
/**
......@@ -45,6 +45,6 @@ public class Next<INPUT> implements Executor<INPUT> {
* @param INPUT
*/
@Override public void execute(INPUT INPUT) {
nextNodes.forEach(node -> node.execute(INPUT));
ways.forEach(way -> way.in(INPUT));
}
}
......@@ -36,10 +36,14 @@ public final class Node<INPUT, OUTPUT> {
}
public final <NEXTOUTPUT> Node<OUTPUT, NEXTOUTPUT> addNext(NodeProcessor<OUTPUT, NEXTOUTPUT> nodeProcessor) {
return this.addNext(new DirectWay(nodeProcessor));
}
public final <NEXTOUTPUT> Node<OUTPUT, NEXTOUTPUT> addNext(WayToNode<OUTPUT, NEXTOUTPUT> way) {
synchronized (graph) {
Node<OUTPUT, NEXTOUTPUT> node = new Node<>(graph, nodeProcessor);
next.addNext(node);
return node;
way.buildDestination(graph);
next.addWay(way);
return way.getDestination();
}
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.graph;
/**
* @author wusheng
*/
public abstract class WayToNode<INPUT, OUTPUT> {
private Node destination;
private NodeProcessor<INPUT, OUTPUT> destinationHandler;
public WayToNode(NodeProcessor<INPUT, OUTPUT> destinationHandler) {
this.destinationHandler = destinationHandler;
}
void buildDestination(Graph graph) {
destination = new Node(graph, destinationHandler);
}
protected abstract void in(INPUT INPUT);
protected void out(INPUT INPUT) {
destination.execute(INPUT);
}
Node getDestination() {
return destination;
}
}
......@@ -114,6 +114,22 @@ public class GraphManagerTest {
foundNode.addNext(new Node4Processor());
}
@Test
public void testDeadEndWay() {
Graph<String> graph = GraphManager.INSTANCE.createIfAbsent(7, String.class);
graph.addNode(new Node1Processor()).addNext(new WayToNode<String, Integer>(new Node2Processor()) {
@Override protected void in(String INPUT) {
//don't call `out(intput)`;
}
});
graph.start("Input String");
String output = outputStream.toString();
String expected = "Node1 process: s=Input String" + lineSeparator;
Assert.assertEquals(expected, output);
}
@After
public void tearDown() {
GraphManager.INSTANCE.reset();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册