提交 33bd6586 编写于 作者: S simon

Using Jackson instead of Fastjson

上级 dea2d463
......@@ -52,6 +52,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
</project>
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task request command */public class TaskExecuteAckCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * startTime */ private Date startTime; /** * host */ private String host; /** * status */ private int status; /** * logPath */ private String logPath; /** * executePath */ private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.JacksonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task request command */public class TaskExecuteAckCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * startTime */ private Date startTime; /** * host */ private String host; /** * status */ private int status; /** * logPath */ private String logPath; /** * executePath */ private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_ACK); byte[] body = JacksonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */public class TaskExecuteRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public TaskExecuteRequestCommand() { } public TaskExecuteRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.JacksonSerializer; import java.io.Serializable; /** * execute task request command */public class TaskExecuteRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public TaskExecuteRequestCommand() { } public TaskExecuteRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_REQUEST); byte[] body = JacksonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task response command */public class TaskExecuteResponseCommand implements Serializable { public TaskExecuteResponseCommand() { } public TaskExecuteResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; /** * processId */ private int processId; /** * appIds */ private String appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getAppIds() { return appIds; } public void setAppIds(String appIds) { this.appIds = appIds; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + ", endTime=" + endTime + ", processId=" + processId + ", appIds='" + appIds + '\'' + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.JacksonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task response command */public class TaskExecuteResponseCommand implements Serializable { public TaskExecuteResponseCommand() { } public TaskExecuteResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; /** * processId */ private int processId; /** * appIds */ private String appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getAppIds() { return appIds; } public void setAppIds(String appIds) { this.appIds = appIds; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_RESPONSE); byte[] body = JacksonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + ", endTime=" + endTime + ", processId=" + processId + ", appIds='" + appIds + '\'' + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */public class TaskKillRequestCommand implements Serializable { /** * task id */ private int taskInstanceId; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillRequestCommand{" + "taskInstanceId=" + taskInstanceId + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.JacksonSerializer; import java.io.Serializable; /** * kill task request command */public class TaskKillRequestCommand implements Serializable { /** * task id */ private int taskInstanceId; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); byte[] body = JacksonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillRequestCommand{" + "taskInstanceId=" + taskInstanceId + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date;import java.util.List; /** * kill task response command */public class TaskKillResponseCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * host */ private String host; /** * status */ private int status; /** * processId */ private int processId; /** * other resource manager appId , for example : YARN etc */ protected List<String> appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public List<String> getAppIds() { return appIds; } public void setAppIds(List<String> appIds) { this.appIds = appIds; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", host='" + host + '\'' + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.JacksonSerializer; import java.io.Serializable;import java.util.List; /** * kill task response command */public class TaskKillResponseCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * host */ private String host; /** * status */ private int status; /** * processId */ private int processId; /** * other resource manager appId , for example : YARN etc */ protected List<String> appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public List<String> getAppIds() { return appIds; } public void setAppIds(List<String> appIds) { this.appIds = appIds; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_RESPONSE); byte[] body = JacksonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", host='" + host + '\'' + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + '}'; }}
\ No newline at end of file
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import java.io.Serializable;
......@@ -56,7 +56,7 @@ public class GetLogBytesRequestCommand implements Serializable {
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.GET_LOG_BYTES_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
byte[] body = JacksonSerializer.serialize(this);
command.setBody(body);
return command;
}
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import java.io.Serializable;
......@@ -57,7 +57,7 @@ public class GetLogBytesResponseCommand implements Serializable {
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.GET_LOG_BYTES_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
byte[] body = JacksonSerializer.serialize(this);
command.setBody(body);
return command;
}
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import java.io.Serializable;
......@@ -84,7 +84,7 @@ public class RollViewLogRequestCommand implements Serializable {
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
byte[] body = JacksonSerializer.serialize(this);
command.setBody(body);
return command;
}
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import java.io.Serializable;
......@@ -57,7 +57,7 @@ public class RollViewLogResponseCommand implements Serializable {
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
byte[] body = JacksonSerializer.serialize(this);
command.setBody(body);
return command;
}
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import java.io.Serializable;
......@@ -56,7 +56,7 @@ public class ViewLogRequestCommand implements Serializable {
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
byte[] body = JacksonSerializer.serialize(this);
command.setBody(body);
return command;
}
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import java.io.Serializable;
......@@ -57,7 +57,7 @@ public class ViewLogResponseCommand implements Serializable {
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
byte[] body = JacksonSerializer.serialize(this);
command.setBody(body);
return command;
}
......
......@@ -16,12 +16,20 @@
*/
package org.apache.dolphinscheduler.remote.utils;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* json serialize or deserialize
*/
public class FastJsonSerializer {
public class JacksonSerializer {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Logger logger = LoggerFactory.getLogger(JacksonSerializer.class);
/**
* serialize to byte
......@@ -31,7 +39,13 @@ public class FastJsonSerializer {
* @return byte array
*/
public static <T> byte[] serialize(T obj) {
String json = JSON.toJSONString(obj);
String json = "";
try {
json = objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
logger.error("serializeToString exception!", e);
}
return json.getBytes(Constants.UTF8);
}
......@@ -42,7 +56,14 @@ public class FastJsonSerializer {
* @return string
*/
public static <T> String serializeToString(T obj) {
return JSON.toJSONString(obj);
String json = "";
try {
json = objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
logger.error("serializeToString exception!", e);
}
return json;
}
/**
......@@ -54,7 +75,15 @@ public class FastJsonSerializer {
* @return deserialize type
*/
public static <T> T deserialize(byte[] src, Class<T> clazz) {
return JSON.parseObject(src, clazz);
String json = new String(src, StandardCharsets.UTF_8);
try {
return objectMapper.readValue(json, clazz);
} catch (IOException e) {
logger.error("deserialize exception!", e);
return null;
}
}
}
......@@ -18,20 +18,20 @@
package org.apache.dolphinscheduler.remote;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import org.junit.Assert;
import org.junit.Test;
public class FastJsonSerializerTest {
public class JacksonSerializerTest {
@Test
public void testSerialize(){
TestObj testObj = new TestObj();
testObj.setAge(12);
byte[] serializeByte = FastJsonSerializer.serialize(testObj);
byte[] serializeByte = JacksonSerializer.serialize(testObj);
//
TestObj deserialize = FastJsonSerializer.deserialize(serializeByte, TestObj.class);
TestObj deserialize = JacksonSerializer.deserialize(serializeByte, TestObj.class);
Assert.assertEquals(testObj.getAge(), deserialize.getAge());
}
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import java.io.Serializable;
import java.util.Date;
......@@ -431,7 +431,7 @@ public class TaskExecutionContext implements Serializable{
public Command toCommand(){
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
requestCommand.setTaskExecutionContext(JacksonSerializer.serializeToString(this));
return requestCommand.convert2Command();
}
......
......@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.log.*;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -61,21 +61,21 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
final CommandType commandType = command.getType();
switch (commandType){
case GET_LOG_BYTES_REQUEST:
GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
GetLogBytesRequestCommand getLogRequest = JacksonSerializer.deserialize(
command.getBody(), GetLogBytesRequestCommand.class);
byte[] bytes = getFileContentBytes(getLogRequest.getPath());
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
break;
case VIEW_WHOLE_LOG_REQUEST:
ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(
ViewLogRequestCommand viewLogRequest = JacksonSerializer.deserialize(
command.getBody(), ViewLogRequestCommand.class);
String msg = readWholeFileContent(viewLogRequest.getPath());
ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
break;
case ROLL_VIEW_LOG_REQUEST:
RollViewLogRequestCommand rollViewLogRequest = FastJsonSerializer.deserialize(
RollViewLogRequestCommand rollViewLogRequest = JacksonSerializer.deserialize(
command.getBody(), RollViewLogRequestCommand.class);
List<String> lines = readPartFileContent(rollViewLogRequest.getPath(),
rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
......
......@@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
......@@ -77,7 +77,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class);
TaskExecuteAckCommand taskAckCommand = JacksonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class);
logger.info("taskAckCommand : {}", taskAckCommand);
taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
......
......@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -45,7 +45,7 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskKillResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class);
TaskKillResponseCommand responseCommand = JacksonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class);
logger.info("received task kill response command : {}", responseCommand);
}
......
......@@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
......@@ -78,7 +78,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class);
TaskExecuteResponseCommand responseCommand = JacksonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class);
logger.info("received command : {}", responseCommand);
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
......
......@@ -31,7 +31,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
......@@ -78,7 +78,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
TaskExecuteRequestCommand taskRequestCommand = JacksonSerializer.deserialize(
command.getBody(), TaskExecuteRequestCommand.class);
logger.info("received command : {}", taskRequestCommand);
......
......@@ -29,7 +29,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
......@@ -83,7 +83,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand killCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
TaskKillRequestCommand killCommand = JacksonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
logger.info("received kill command : {}", killCommand);
Pair<Boolean, List<String>> result = doKill(killCommand);
......
......@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.*;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -77,7 +77,7 @@ public class LogClientService {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if(response != null){
RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
RollViewLogResponseCommand rollReviewLog = JacksonSerializer.deserialize(
response.getBody(), RollViewLogResponseCommand.class);
return rollReviewLog.getMsg();
}
......@@ -105,7 +105,7 @@ public class LogClientService {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if(response != null){
ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
ViewLogResponseCommand viewLog = JacksonSerializer.deserialize(
response.getBody(), ViewLogResponseCommand.class);
return viewLog.getMsg();
}
......@@ -133,7 +133,7 @@ public class LogClientService {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if(response != null){
GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
GetLogBytesResponseCommand getLog = JacksonSerializer.deserialize(
response.getBody(), GetLogBytesResponseCommand.class);
return getLog.getData();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册