提交 038811a1 编写于 作者: A ascrutae

fix:#79

上级 baeda17b
......@@ -15,7 +15,7 @@
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<nanohttpd.version>2.2.0</nanohttpd.version>
</properties>
<dependencies>
......@@ -28,7 +28,30 @@
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-util</artifactId>
<version>2.1-2017</version>
<version>${project.parent.version}</version>
</dependency>
<!-- nano httpd -->
<dependency>
<groupId>org.nanohttpd</groupId>
<artifactId>nanohttpd</artifactId>
<version>${nanohttpd.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>
</dependencies>
......
......@@ -9,6 +9,7 @@ import com.a.eye.skywalking.registry.api.RegistryCenter;
import com.a.eye.skywalking.registry.assist.NetUtils;
import com.a.eye.skywalking.registry.impl.zookeeper.ZookeeperConfig;
import com.a.eye.skywalking.routing.config.Config;
import com.a.eye.skywalking.routing.http.RestfulAPIService;
import com.a.eye.skywalking.routing.listener.SpanStorageListenerImpl;
import com.a.eye.skywalking.routing.listener.TraceSearchListenerImpl;
import com.a.eye.skywalking.routing.router.RoutingService;
......@@ -23,8 +24,9 @@ import java.util.Properties;
* It starts server in a sequence:
* 1. init config
* 2. init logManager
* 3. registry server
* 4. open service, and start listening port.
* 3. start restful service
* 4. registry server
* 5. open service, and start listening port.
*
* @author wusheng
*/
......@@ -33,10 +35,13 @@ public class Main {
private static final ILog logger = LogManager.getLogger(Main.class);
public static void main(String[] args) {
RestfulAPIService restfulAPIService = null;
try {
initConfig();
LogManager.setLogResolver(new Log4j2Resolver());
restfulAPIService = new RestfulAPIService(Config.Server.REST_SERVICE_HOST,Config.Server
.REST_SERVICE_PORT);
restfulAPIService.doStart();
RegistryCenter center = RegistryCenterFactory.INSTANCE.getRegistryCenter(Config.RegistryCenter.TYPE);
center.start(fetchRegistryCenterConfig());
center.subscribe(Config.StorageNode.SUBSCRIBE_PATH, RoutingService.getRouter());
......@@ -47,10 +52,10 @@ public class Main {
logger.info("Skywalking routing service was started.");
Thread.currentThread().join();
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to start routing service.", e);
System.exit(-1);
} finally {
restfulAPIService.doStop();
RoutingService.stop();
}
}
......
......@@ -3,6 +3,10 @@ package com.a.eye.skywalking.routing.config;
public class Config {
public static class Server {
public static int PORT = 23000;
public static String REST_SERVICE_HOST = "0.0.0.0";
public static int REST_SERVICE_PORT = 23100;
}
......
package com.a.eye.skywalking.routing.http;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.dependencies.com.google.gson.Gson;
import com.a.eye.skywalking.routing.http.module.ResponseMessage;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import fi.iki.elonen.NanoHTTPD;
import static com.a.eye.skywalking.routing.http.module.ResponseMessage.REQUEST_METHOD_NOT_SUPPORT;
import static com.a.eye.skywalking.routing.http.module.ResponseMessage.SERVER_ERROR;
public class RestfulAPIService extends NanoHTTPD {
private static final SpanStorageController spanController = new SpanStorageController();
public static final String JSON_MIME_TYPE = "application/json";
private ILog logger = LogManager.getLogger(RestfulAPIService.class);
public RestfulAPIService(String host, int port) {
super(host, port);
}
public void doStart() throws IOException {
try {
start(NanoHTTPD.SOCKET_READ_TIMEOUT, false);
logger.info("Restful api service is up.\n");
} catch (IOException e) {
logger.error("Failed to start service.", e);
throw e;
}
}
@Override
public Response serve(IHTTPSession session) {
if (session.getMethod() != Method.POST) {
return newFixedLengthResponse(Response.Status.OK, JSON_MIME_TYPE,
new Gson().toJson(REQUEST_METHOD_NOT_SUPPORT));
}
ResponseMessage responseMessage = ResponseMessage.URL_NOT_FOUND;
try {
String postData = getPostData(session);
if (spanController.isAddAckSpanURI(session.getUri())) {
responseMessage = spanController.addAckSpans(postData);
}
if (spanController.isAddRequestSpanURI(session.getUri())) {
responseMessage = spanController.addRequestSpans(postData);
}
} catch (Throwable e) {
logger.error("server error.", e);
responseMessage = SERVER_ERROR;
}
return newFixedLengthResponse(Response.Status.OK, JSON_MIME_TYPE, new Gson().toJson
(responseMessage));
}
/**
* Get the post data from request
*/
private String getPostData(IHTTPSession session) throws IOException, ResponseException {
Map<String, String> parameters = new HashMap<String, String>();
session.parseBody(parameters);
return parameters.get("postData");
}
public void doStop() {
stop();
}
}
package com.a.eye.skywalking.routing.http;
import com.google.gson.reflect.TypeToken;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.dependencies.com.google.gson.Gson;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.listener.server.SpanStorageServerListener;
import com.a.eye.skywalking.routing.http.module.AckSpanModule;
import com.a.eye.skywalking.routing.http.module.RequestSpanModule;
import com.a.eye.skywalking.routing.http.module.ResponseMessage;
import com.a.eye.skywalking.routing.listener.SpanStorageListenerImpl;
import java.util.List;
import static com.a.eye.skywalking.routing.http.module.ResponseMessage.OK;
public class SpanStorageController {
private static String ADD_REQUEST_SPAN_URI = "/spans/request";
private static String ADD_ACK_SPANS_URI = "/spans/ack";
private SpanStorageServerListener spanStorageServerListener = new SpanStorageListenerImpl();
/**
* add request spans
*
* @param jsonData the json data of span
*/
public ResponseMessage addRequestSpans(String jsonData) {
List<RequestSpanModule> requestSpanModules = new Gson().fromJson(jsonData,
new TypeToken<List<RequestSpanModule>>() {
}.getType());
for (RequestSpanModule span : requestSpanModules) {
RequestSpan requestSpan = span.convertToGRPCModule();
if (requestSpan != null) {
spanStorageServerListener.storage(requestSpan);
}
}
return OK;
}
/**
* add ack spans
*
* @param jsonData the json data of span
*/
public ResponseMessage addAckSpans(String jsonData) {
List<AckSpanModule> requestSpanModules = new Gson().fromJson(jsonData,
new TypeToken<List<AckSpanModule>>() {
}.getType());
for (AckSpanModule span : requestSpanModules) {
AckSpan ackSpan = span.convertToGRPCModule();
if (ackSpan != null) {
spanStorageServerListener.storage(ackSpan);
}
}
return OK;
}
public boolean isAddRequestSpanURI(String uri){
return ADD_REQUEST_SPAN_URI.equals(uri);
}
public boolean isAddAckSpanURI(String uri){
return ADD_ACK_SPANS_URI.equals(uri);
}
}
package com.a.eye.skywalking.routing.http.module;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.util.StringUtil;
import com.a.eye.skywalking.util.TraceIdUtil;
import java.util.HashMap;
import java.util.Map;
/**
* Ack span module
*/
public class AckSpanModule {
private String traceId;
private String parentLevelId = "";
private int levelId = 0;
private long cost;
private int routeKey;
private Map<String, String> tags;
public AckSpan convertToGRPCModule() {
if (illegalAckSpan()) {
return null;
}
return AckSpan.newBuilder().putAllTags(tags).setLevelId(levelId).setParentLevel
(parentLevelId).setRouteKey(routeKey).setCost(cost)
.setTraceId(TraceIdUtil.toTraceId(traceId)).build();
}
private boolean illegalAckSpan() {
if (StringUtil.isEmpty(traceId)) {
return true;
}
if (tags.isEmpty()){
return true;
}
return false;
}
}
package com.a.eye.skywalking.routing.http.module;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.util.StringUtil;
import com.a.eye.skywalking.util.TraceIdUtil;
import java.util.HashMap;
import java.util.Map;
/**
* request span module
*/
public class RequestSpanModule {
private String traceId;
private String parentLevelId = "";
private int levelId;
private long startTime;
private int routeKey;
private Map<String, String> tags;
public RequestSpan convertToGRPCModule() {
if (illegalRequestSpan()) {
return null;
}
return RequestSpan.newBuilder().putAllTags(tags).setLevelId(levelId).setParentLevel
(parentLevelId).setRouteKey(routeKey).setStartDate(startTime)
.setTraceId(TraceIdUtil.toTraceId(traceId)).build();
}
private boolean illegalRequestSpan() {
if (StringUtil.isEmpty(traceId)) {
return true;
}
if (tags.isEmpty()){
return true;
}
return false;
}
}
package com.a.eye.skywalking.routing.http.module;
import com.a.eye.skywalking.network.dependencies.com.google.gson.Gson;
public class ResponseMessage {
public static final ResponseMessage OK = new ResponseMessage(200, "Store success");
public static final ResponseMessage REQUEST_METHOD_NOT_SUPPORT = new ResponseMessage(403, "Request method " +
"not support");
public static final ResponseMessage SERVER_ERROR = new ResponseMessage(500, "Server error");
public static final ResponseMessage URL_NOT_FOUND = new ResponseMessage(404, "Not found");
/**
* Response code:
* 200 -- store success
* 403 -- request method not support
* 500 -- server error
* 404 -- not found
*/
private int code;
private String message;
ResponseMessage(int code, String message) {
this.code = code;
this.message = message;
}
public int getCode() {
return code;
}
}
# the port which routing server listening
server.port=23000
# the ip that rest api service binding
server.rest_service_host=0.0.0.0
# the port which rest api service listening
server.rest_service_port=23100
#
#search.check_cycle=100
#
......
package com.a.eye.skywalking.routing.http;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.entity.EntityBuilder;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.charset.Charset;
/**
* Created by xin on 2017/1/10.
*/
public class HttpClientUtil {
public static ResponseResult doPost(String url, String bodyJson) throws IOException {
CloseableHttpClient client = HttpClients.createDefault();
HttpPost httpPost = new HttpPost(url);
EntityBuilder entityBuilder = EntityBuilder.create()
.setText(bodyJson)
.setContentType(ContentType.APPLICATION_JSON.withCharset("utf-8"));
httpPost.setEntity(entityBuilder.build());
CloseableHttpResponse response = client.execute(httpPost);
response.getStatusLine().getStatusCode();
StringWriter writer = new StringWriter();
IOUtils.copy(response.getEntity().getContent(), writer, Charset.forName("UTF-8"));
return new ResponseResult(writer.toString(), response.getStatusLine().getStatusCode());
}
public static ResponseResult doGet(String url) throws IOException {
CloseableHttpClient client = HttpClients.createDefault();
HttpGet httpGet = new HttpGet(url);
httpGet.setHeader("Accept", "application/json");
httpGet.setHeader("Content-type", "application/json");
CloseableHttpResponse response = client.execute(httpGet);
response.getStatusLine().getStatusCode();
StringWriter writer = new StringWriter();
IOUtils.copy(response.getEntity().getContent(), writer, Charset.forName("UTF-8"));
return new ResponseResult(writer.toString(), response.getStatusLine().getStatusCode());
}
}
package com.a.eye.skywalking.routing.http;
import com.google.gson.Gson;
import com.a.eye.skywalking.routing.http.module.ResponseMessage;
/**
* Created by xin on 2017/1/10.
*/
public class ResponseResult {
private int statusCode;
private String responseBody;
public ResponseResult(String responseBody, int statusCode) {
this.statusCode = statusCode;
this.responseBody = responseBody;
}
public ResponseMessage getResponseMessage() {
return new Gson().fromJson(responseBody, ResponseMessage.class);
}
public int getStatusCode(){
return statusCode;
}
}
package com.a.eye.skywalking.routing.http;
import com.a.eye.skywalking.routing.http.module.ResponseMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class RestfulAPIServiceTest {
private final static int REST_SERVER_PORT = 54333;
private final static String REST_URL_PREFIX = "http://localhost:" + REST_SERVER_PORT;
private RestfulAPIService restfulAPIService;
@Before
public void setUp() throws Exception {
restfulAPIService = new RestfulAPIService("localhost", REST_SERVER_PORT);
restfulAPIService.doStart();
}
@Test
public void testRequestMethodWithGet() throws IOException {
ResponseResult responseResult = HttpClientUtil.doGet(REST_URL_PREFIX);
assertEquals(200, responseResult.getStatusCode());
ResponseMessage responseMessage = responseResult.getResponseMessage();
assertEquals(403, responseMessage.getCode());
}
@Test
public void testRequestMethodWithWrongURL() throws IOException {
ResponseResult responseResult = HttpClientUtil.doPost(REST_URL_PREFIX, "{}");
assertEquals(200, responseResult.getStatusCode());
ResponseMessage responseMessage = responseResult.getResponseMessage();
assertEquals(404, responseMessage.getCode());
}
@Test
public void testAddAckSpans() throws IOException {
ResponseResult responseResult = HttpClientUtil.doPost
(REST_URL_PREFIX + "/spans/ack", "[{\n" +
"\"traceId\":\"212017.1484100963000.-215172798.6571.52.2\",\n" +
"\"parentLevelId\":\"\",\n" +
"\"levelId\":0,\n" +
"\"cost\":14,\n" +
"\"routeKey\":123456,\n" +
"\"tags\": {\n" +
" \"viewpoint\":\"http://localhost:8080/skywalking/test\",\n" +
" \"error.status\":\"0\",\n" +
" \"applicationCode\":\"test\",\n" +
" \"username\":\"test\"\n" +
" }\n" +
"}]");
assertEquals(200, responseResult.getStatusCode());
ResponseMessage responseMessage = responseResult.getResponseMessage();
assertEquals(200, responseMessage.getCode());
}
@Test
public void testAddRequestSpans() throws IOException {
ResponseResult responseResult = HttpClientUtil.doPost
(REST_URL_PREFIX + "/spans/request", "[{\n" +
"\"traceId\":\"212017.1484100963000.-215172798.6571.52.2\",\n" +
"\"parentLevelId\":\"\",\n" +
"\"levelId\":0,\n" +
"\"startTime\":0,\n" +
"\"routeKey\":123456,\n" +
"\"tags\":{\n" +
" \"viewpoint\":\"http://localhost:8080/skywalking/test\",\n" +
" \"hostname\":\"192.168.1.1\",\n" +
" \"error.status\":\"0\",\n" +
" \"process_no\":\"123456\",\n" +
" \"applicationCode\":\"test\",\n" +
" \"call.desc\":\"W\",\n" +
" \"call.type\":\"S\",\n" +
" \"username\":\"test\"\n" +
" }\n" +
"}]");
assertEquals(200, responseResult.getStatusCode());
ResponseMessage responseMessage = responseResult.getResponseMessage();
assertEquals(200, responseMessage.getCode());
}
@Test
public void testAddWithErrorRequestSpanJson() throws IOException {
ResponseResult responseResult = HttpClientUtil.doPost
(REST_URL_PREFIX + "/spans/request", "{\n" +
"\"traceId\":\"212017.1484100963000.-215172798.6571.52.2\",\n" +
"\"parentLevelId\":\"\",\n" +
"\"levelId\":0,\n" +
"\"startTime\":0,\n" +
"\"routeKey\":123456,\n" +
"\"tags\":{\n" +
" \"viewpoint\":\"http://localhost:8080/skywalking/test\",\n" +
" \"hostname\":\"192.168.1.1\",\n" +
" \"error.status\":\"0\",\n" +
" \"process_no\":\"123456\",\n" +
" \"applicationCode\":\"test\",\n" +
" \"call.desc\":\"W\",\n" +
" \"call.type\":\"S\",\n" +
" \"username\":\"test\"\n" +
" }\n" +
"}");
assertEquals(200, responseResult.getStatusCode());
ResponseMessage responseMessage = responseResult.getResponseMessage();
assertEquals(500, responseMessage.getCode());
}
@After
public void tearDown() throws Exception {
restfulAPIService.doStop();
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册