提交 fea00970 编写于 作者: O o2null

更新集群模块同步机制,可设置在汇报前进行健康检查.

上级 d94d50f6
......@@ -184,6 +184,7 @@
"nodeAgentEncrypt": true,
"quickStartWebApp": false,
"autoStart": true,
"selfHealthCheckEnable": false,
"###enable": "是否启用###",
"###isPrimaryCenter": "是否是center节点,仅允许存在一个center节点###",
"###center": "Center服务器配置###",
......@@ -202,5 +203,6 @@
"###quickStartWebApp": "是否使用快速应用部署###",
"###banner": "服务器控制台启动标识###",
"###autoStart": "是否自动启动###",
"###eraseContentEnable": "是否允许使用擦除数据功能###"
"###eraseContentEnable": "是否允许使用擦除数据功能###",
"###selfHealthCheckEnable": "是否启用节点上模块健康自检查,如果启用在提交到center之前将进行模块的健康检查.默认false###"
}
\ No newline at end of file
package com.x.base.core.project;
import java.util.Date;
import javax.servlet.ServletContext;
import com.x.base.core.project.annotation.Module;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.jaxrs.WrapClearCacheRequest;
import com.x.base.core.project.queue.AbstractQueue;
import com.x.base.core.project.schedule.AbstractJob;
......@@ -15,11 +19,13 @@ public abstract class AbstractContext {
protected static final String INITPARAMETER_PORJECT = "project";
public abstract Applications applications() throws Exception;
public static final String SERVLETCONTEXT_ATTRIBUTE_APPLICATION = "application";
protected ThreadFactory threadFactory;
// public abstract Applications applications() throws Exception;
public abstract ThreadFactory threadFactory();
// protected ThreadFactory threadFactory;
//
// public abstract ThreadFactory threadFactory();
// 应用类
protected Class<?> clazz;
......@@ -48,4 +54,20 @@ public abstract class AbstractContext {
public abstract <T extends AbstractJob> void fireScheduleOnLocal(Class<T> cls, Integer delay) throws Exception;
public abstract AbstractQueue<WrapClearCacheRequest> clearCacheRequestQueue();
private volatile Date applicationsTimestamp = null;
public Applications applications() throws Exception {
if (null != Config.resource_node_applicationsTimestamp()) {
if (null == this.applicationsTimestamp
|| (this.applicationsTimestamp.before(Config.resource_node_applicationsTimestamp()))) {
synchronized (this) {
this.applications = XGsonBuilder.instance().fromJson(Config.resource_node_applications(),
Applications.class);
this.applicationsTimestamp = Config.resource_node_applicationsTimestamp();
}
}
}
return this.applications;
}
}
package com.x.base.core.project;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
......@@ -139,25 +138,9 @@ public class Context extends AbstractContext {
return this.clearCacheRequestQueue;
}
private volatile Date applicationsTimestamp = null;
public Applications applications() throws Exception {
if (null != Config.resource_node_applicationsTimestamp()) {
if (null == this.applicationsTimestamp
|| (this.applicationsTimestamp.before(Config.resource_node_applicationsTimestamp()))) {
synchronized (this) {
this.applications = XGsonBuilder.instance().fromJson(Config.resource_node_applications(),
Applications.class);
this.applicationsTimestamp = Config.resource_node_applicationsTimestamp();
}
}
}
return this.applications;
}
public ThreadFactory threadFactory() {
return this.threadFactory;
}
// public ThreadFactory threadFactory() {
// return this.threadFactory;
// }
/* 队列 */
private List<AbstractQueue<?>> queues;
......@@ -187,7 +170,7 @@ public class Context extends AbstractContext {
context.scheduleWeight = Config.currentNode().getApplication().scheduleWeight(context.clazz);
context.sslEnable = Config.currentNode().getApplication().getSslEnable();
context.initDatas();
context.threadFactory = new ThreadFactory(context);
// context.threadFactory = new ThreadFactory(context);
servletContext.setAttribute(AbstractContext.class.getName(), context);
context.initialized = true;
return context;
......@@ -208,6 +191,8 @@ public class Context extends AbstractContext {
application.setScheduleLocalRequestList(this.scheduleLocalRequestList);
application.setScheduleRequestList(this.scheduleRequestList);
JsonElement jsonElement = XGsonBuilder.instance().toJsonTree(application);
// 将当前的application写入到servletContext
servletContext.setAttribute(SERVLETCONTEXT_ATTRIBUTE_APPLICATION, jsonElement.toString());
JsonObject jsonObject = jsonElement.getAsJsonObject();
jsonObject.addProperty("type", "registApplication");
Config.resource_node_eventQueue().put(jsonObject);
......
package com.x.base.core.project;
import com.x.base.core.project.exception.PromptException;
public class ExceptionNotFindPirmaryCenterServer extends PromptException {
private static final long serialVersionUID = -4834776351837354462L;
public ExceptionNotFindPirmaryCenterServer(String node) {
super("can not find pirmary center server node: {}.", node);
}
}
......@@ -11,6 +11,7 @@ public class Node extends ConfigObject {
public static final Integer DEFAULT_NODEAGENTPORT = 20010;
public static final String DEFAULT_BANNER = "O2OA";
public static final Integer DEFAULT_LOGSIZE = 14;
public static final Boolean DEFAULT_SELFHEALTHCHECKENABLE = false;
public static Node defaultInstance() {
Node o = new Node();
......@@ -29,6 +30,7 @@ public class Node extends ConfigObject {
o.nodeAgentPort = DEFAULT_NODEAGENTPORT;
o.quickStartWebApp = false;
o.autoStart = true;
o.selfHealthCheckEnable = false;
return o;
}
......@@ -70,6 +72,12 @@ public class Node extends ConfigObject {
private Boolean autoStart;
@FieldDescribe("是否允许使用擦除数据功能")
private Boolean eraseContentEnable;
@FieldDescribe("是否启用节点上模块健康自检查,如果启用在提交到center之前将进行模块的健康检查.默认false")
private Boolean selfHealthCheckEnable;
public Boolean getSelfHealthCheckEnable() {
return BooleanUtils.isTrue(selfHealthCheckEnable);
}
/* 20191009兼容centerServer */
protected void setCenter(CenterServer centerServer) {
......
package com.x.base.core.project.config;
import java.io.File;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ConcurrentSkipListMap;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.tools.DefaultCharset;
import com.x.base.core.project.tools.Host;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.tools.DefaultCharset;
import com.x.base.core.project.tools.ListTools;
import org.apache.commons.lang3.StringUtils;
public class WebServers extends ConcurrentSkipListMap<String, WebServer> {
......@@ -82,37 +89,67 @@ public class WebServers extends ConcurrentSkipListMap<String, WebServer> {
LinkedHashMap<String, Object> map = new LinkedHashMap<>();
/** 覆盖掉配置的参数 */
com.x.base.core.project.config.CenterServer centerServerConfig = Config.nodes().centerServers().first()
.getValue();
map.putAll(centerServerConfig.getConfig());
// 先取本节点的center如果没有那么取第一个center
com.x.base.core.project.config.CenterServer centerServerConfig = Config.currentNode().getCenter();
List<Map<String, String>> centers = new ArrayList<>();
map.put("center", centers);
/** 写入center地址 */
Map<String, String> center = new HashMap<String, String>();
center = new HashMap<String, String>();
if ((null == centerServerConfig) || BooleanUtils.isNotTrue(centerServerConfig.getEnable())) {
Entry<String, CenterServer> entry = Config.nodes().centerServers().orderedEntry().get(0);
centerServerConfig = entry.getValue();
Map<String, String> center = new HashMap<>();
center.put("host", entry.getKey());
center.put("port", centerServerConfig.getPort().toString());
centers.add(center);
if (StringUtils.isNotEmpty(centerServerConfig.getProxyHost())) {
center = new HashMap<>();
center.put("host", centerServerConfig.getProxyHost());
center.put("port", centerServerConfig.getProxyPort().toString());
centers.add(center);
}
if (!Objects.equals(centerServerConfig.getProxyPort(), centerServerConfig.getPort())) {
center = new HashMap<>();
center.put("host", entry.getKey());
center.put("port", centerServerConfig.getProxyPort().toString());
centers.add(center);
}
}
Map<String, String> center = new HashMap<>();
center.put("host", "");
center.put("port", centerServerConfig.getPort().toString());
centers.add(center);
if (!Objects.equals(centerServerConfig.getProxyPort(), centerServerConfig.getPort())) {
center = new HashMap<String, String>();
center = new HashMap<>();
center.put("host", "");
center.put("port", centerServerConfig.getProxyPort().toString());
centers.add(center);
}
String host = Config.nodes().primaryCenterNode();
if (!Host.isRollback(host)) {
center = new HashMap<String, String>();
center.put("host", host);
center.put("port", centerServerConfig.getPort().toString());
centers.add(center);
}
/** 写入proxy地址 */
if (StringUtils.isNotEmpty(centerServerConfig.getProxyHost())) {
center = new HashMap<String, String>();
center.put("host", centerServerConfig.getProxyHost());
center.put("port", centerServerConfig.getProxyPort().toString());
centers.add(center);
}
map.putAll(centerServerConfig.getConfig());
// /** 写入center地址 */
// Map<String, String> center = new HashMap<String, String>();
// center.put("host", "");
// center.put("port", centerServerConfig.getPort().toString());
// centers.add(center);
// if (!Objects.equals(centerServerConfig.getProxyPort(), centerServerConfig.getPort())) {
// center = new HashMap<String, String>();
// center.put("host", "");
// center.put("port", centerServerConfig.getProxyPort().toString());
// centers.add(center);
// }
//
// String host = Config.nodes().primaryCenterNode();
// if (!Host.isRollback(host)) {
// center = new HashMap<String, String>();
// center.put("host", host);
// center.put("port", centerServerConfig.getPort().toString());
// centers.add(center);
// }
// /** 写入proxy地址 */
// if (StringUtils.isNotEmpty(centerServerConfig.getProxyHost())) {
// center = new HashMap<String, String>();
// center.put("host", centerServerConfig.getProxyHost());
// center.put("port", centerServerConfig.getProxyPort().toString());
// centers.add(center);
// }
/** 写入systemName */
map.put("footer", Config.collect().getFooter());
......@@ -120,7 +157,7 @@ public class WebServers extends ConcurrentSkipListMap<String, WebServer> {
map.put("version", Config.version());
map.put("appUrl", Config.collect().getAppUrl());
/***/
if (centerServerConfig.getSslEnable()) {
if (BooleanUtils.isTrue(centerServerConfig.getSslEnable())) {
map.put("app_protocol", "https:");
} else {
map.put("app_protocol", "http:");
......
......@@ -27,11 +27,26 @@ public class CipherConnectionAction {
return ConnectionAction.get(address, headers);
}
public static ActionResponse get(Boolean xdebugger, int connectTimeout, int readTimeout, String address)
throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
headers.add(new NameValuePair(HttpToken.X_Debugger, true));
}
return ConnectionAction.get(connectTimeout, readTimeout, address, headers);
}
public static ActionResponse get(Boolean xdebugger, Application application, String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return get(xdebugger, addr);
}
public static ActionResponse get(Boolean xdebugger, int connectTimeout, int readTimeout, Application application,
String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return get(xdebugger, connectTimeout, readTimeout, addr);
}
public static byte[] getBinary(Boolean xdebugger, String address) throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
......@@ -40,11 +55,26 @@ public class CipherConnectionAction {
return ConnectionAction.getBinary(address, headers);
}
public static byte[] getBinary(Boolean xdebugger, int connectTimeout, int readTimeout, String address)
throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
headers.add(new NameValuePair(HttpToken.X_Debugger, true));
}
return ConnectionAction.getBinary(connectTimeout, readTimeout, address, headers);
}
public static byte[] getBinary(Boolean xdebugger, Application application, String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return getBinary(xdebugger, addr);
}
public static byte[] getBinary(Boolean xdebugger, int connectTimeout, int readTimeout, Application application,
String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return getBinary(xdebugger, connectTimeout, readTimeout, addr);
}
public static ActionResponse delete(Boolean xdebugger, String address) throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
......@@ -53,11 +83,26 @@ public class CipherConnectionAction {
return ConnectionAction.delete(address, headers);
}
public static ActionResponse delete(Boolean xdebugger, int connectTimeout, int readTimeout, String address)
throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
headers.add(new NameValuePair(HttpToken.X_Debugger, true));
}
return ConnectionAction.delete(connectTimeout, readTimeout, address, headers);
}
public static ActionResponse delete(Boolean xdebugger, Application application, String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return delete(xdebugger, addr);
}
public static ActionResponse delete(Boolean xdebugger, int connectTimeout, int readTimeout, Application application,
String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return delete(xdebugger, connectTimeout, readTimeout, addr);
}
public static byte[] deleteBinary(Boolean xdebugger, String address) throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
......@@ -66,11 +111,26 @@ public class CipherConnectionAction {
return ConnectionAction.deleteBinary(address, headers);
}
public static byte[] deleteBinary(Boolean xdebugger, int connectTimeout, int readTimeout, String address)
throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
headers.add(new NameValuePair(HttpToken.X_Debugger, true));
}
return ConnectionAction.deleteBinary(connectTimeout, readTimeout, address, headers);
}
public static byte[] deleteBinary(Boolean xdebugger, Application application, String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return deleteBinary(xdebugger, addr);
}
public static byte[] deleteBinary(Boolean xdebugger, int connectTimeout, int readTimeout, Application application,
String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return deleteBinary(xdebugger, connectTimeout, readTimeout, addr);
}
public static ActionResponse post(Boolean xdebugger, String address, Object body) throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
......@@ -79,12 +139,27 @@ public class CipherConnectionAction {
return ConnectionAction.post(address, headers, body);
}
public static ActionResponse post(Boolean xdebugger, int connectTimeout, int readTimeout, String address,
Object body) throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
headers.add(new NameValuePair(HttpToken.X_Debugger, true));
}
return ConnectionAction.post(connectTimeout, readTimeout, address, headers, body);
}
public static ActionResponse post(Boolean xdebugger, Object body, Application application, String... strs)
throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return post(xdebugger, addr, body);
}
public static ActionResponse post(Boolean xdebugger, int connectTimeout, int readTimeout, Object body,
Application application, String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return post(xdebugger, connectTimeout, readTimeout, addr, body);
}
public static byte[] postBinary(Boolean xdebugger, String address, Object body) throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
......@@ -93,12 +168,27 @@ public class CipherConnectionAction {
return ConnectionAction.postBinary(address, headers, body);
}
public static byte[] postBinary(Boolean xdebugger, int connectTimeout, int readTimeout, String address, Object body)
throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
headers.add(new NameValuePair(HttpToken.X_Debugger, true));
}
return ConnectionAction.postBinary(connectTimeout, readTimeout, address, headers, body);
}
public static byte[] postBinary(Boolean xdebugger, Object body, Application application, String... strs)
throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return postBinary(xdebugger, addr, body);
}
public static byte[] postBinary(Boolean xdebugger, int connectTimeout, int readTimeout, Object body,
Application application, String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return postBinary(xdebugger, connectTimeout, readTimeout, addr, body);
}
public static byte[] postMultiPartBinary(Boolean xdebugger, String address, Collection<FormField> formFields,
Collection<FilePart> fileParts) throws Exception {
List<NameValuePair> headers = cipher();
......@@ -108,12 +198,29 @@ public class CipherConnectionAction {
return ConnectionAction.postMultiPartBinary(address, headers, formFields, fileParts);
}
public static byte[] postMultiPartBinary(Boolean xdebugger, int connectTimeout, int readTimeout, String address,
Collection<FormField> formFields, Collection<FilePart> fileParts) throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
headers.add(new NameValuePair(HttpToken.X_Debugger, true));
}
return ConnectionAction.postMultiPartBinary(connectTimeout, readTimeout, address, headers, formFields,
fileParts);
}
public static byte[] postMultiPartBinary(Boolean xdebugger, Collection<FormField> formFields,
Collection<FilePart> fileParts, Application application, String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return postMultiPartBinary(xdebugger, addr, formFields, fileParts);
}
public static byte[] postMultiPartBinary(Boolean xdebugger, int connectTimeout, int readTimeout,
Collection<FormField> formFields, Collection<FilePart> fileParts, Application application, String... strs)
throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return postMultiPartBinary(xdebugger, connectTimeout, readTimeout, addr, formFields, fileParts);
}
public static ActionResponse put(Boolean xdebugger, String address, Object body) throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
......@@ -122,12 +229,27 @@ public class CipherConnectionAction {
return ConnectionAction.put(address, headers, body);
}
public static ActionResponse put(Boolean xdebugger, int connectTimeout, int readTimeout, String address,
Object body) throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
headers.add(new NameValuePair(HttpToken.X_Debugger, true));
}
return ConnectionAction.put(connectTimeout, readTimeout, address, headers, body);
}
public static ActionResponse put(Boolean xdebugger, Object body, Application application, String... strs)
throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return put(xdebugger, addr, body);
}
public static ActionResponse put(Boolean xdebugger, int connectTimeout, int readTimeout, Object body,
Application application, String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return put(xdebugger, connectTimeout, readTimeout, addr, body);
}
public static byte[] putBinary(Boolean xdebugger, String address, Object body) throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
......@@ -136,12 +258,27 @@ public class CipherConnectionAction {
return ConnectionAction.putBinary(address, headers, body);
}
public static byte[] putBinary(Boolean xdebugger, int connectTimeout, int readTimeout, String address, Object body)
throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
headers.add(new NameValuePair(HttpToken.X_Debugger, true));
}
return ConnectionAction.putBinary(connectTimeout, readTimeout, address, headers, body);
}
public static byte[] putBinary(Boolean xdebugger, Object body, Application application, String... strs)
throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return putBinary(xdebugger, addr, body);
}
public static byte[] putBinary(Boolean xdebugger, int connectTimeout, int readTimeout, Object body,
Application application, String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return putBinary(xdebugger, connectTimeout, readTimeout, addr, body);
}
public static byte[] putMultiPartBinary(Boolean xdebugger, String address, Collection<FormField> formFields,
Collection<FilePart> fileParts) throws Exception {
List<NameValuePair> headers = cipher();
......@@ -151,15 +288,31 @@ public class CipherConnectionAction {
return ConnectionAction.putMultiPartBinary(address, headers, formFields, fileParts);
}
public static byte[] putMultiPartBinary(Boolean xdebugger, int connectTimeout, int readTimeout, String address,
Collection<FormField> formFields, Collection<FilePart> fileParts) throws Exception {
List<NameValuePair> headers = cipher();
if (BooleanUtils.isTrue(xdebugger)) {
headers.add(new NameValuePair(HttpToken.X_Debugger, true));
}
return ConnectionAction.putMultiPartBinary(connectTimeout, readTimeout, address, headers, formFields,
fileParts);
}
public static byte[] putMultiPartBinary(Boolean xdebugger, Collection<FormField> formFields,
Collection<FilePart> fileParts, Application application, String... strs) throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return putMultiPartBinary(xdebugger, addr, formFields, fileParts);
}
public static byte[] putMultiPartBinary(Boolean xdebugger, int connectTimeout, int readTimeout,
Collection<FormField> formFields, Collection<FilePart> fileParts, Application application, String... strs)
throws Exception {
String addr = application.getUrlJaxrsRoot() + trim(Applications.joinQueryUri(strs));
return putMultiPartBinary(xdebugger, connectTimeout, readTimeout, addr, formFields, fileParts);
}
public static List<NameValuePair> cipher() throws Exception {
EffectivePerson effectivePerson = EffectivePerson.cipher(Config.token().getCipher());
// return ListTools.toList(new NameValuePair(HttpToken.X_Token, effectivePerson.getToken()));
return ListTools.toList(new NameValuePair(Config.person().getTokenName(), effectivePerson.getToken()));
}
......
......@@ -13,6 +13,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
......@@ -24,16 +27,11 @@ import com.x.base.core.project.tools.DefaultCharset;
import com.x.base.core.project.tools.ListTools;
import com.x.base.core.project.tools.StringTools;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
public class ConnectionAction {
private static final int DEFAULT_CONNECTTIMEOUT = 2000;
private static final int DEFAULT_READTIMEOUT = 5 * 60 * 1000;
private ConnectionAction() {
}
......@@ -57,7 +55,8 @@ public class ConnectionAction {
private static Gson gson = XGsonBuilder.instance();
private static ActionResponse getDelete(String address, String method, List<NameValuePair> heads) throws Exception {
private static ActionResponse getDelete(int connectTimeout, int readTimeout, String address, String method,
List<NameValuePair> heads) throws Exception {
ActionResponse response = new ActionResponse();
HttpURLConnection connection = null;
try {
......@@ -74,6 +73,8 @@ public class ConnectionAction {
connection.setUseCaches(false);
connection.setDoOutput(false);
connection.setDoInput(true);
connection.setConnectTimeout(connectTimeout);
connection.setReadTimeout(readTimeout);
try {
connection.connect();
} catch (Exception e) {
......@@ -85,15 +86,26 @@ public class ConnectionAction {
return read(response, connection);
}
public static ActionResponse get(int connectTimeout, int readTimeout, String address, List<NameValuePair> heads)
throws Exception {
return getDelete(connectTimeout, readTimeout, address, METHOD_GET, heads);
}
public static ActionResponse get(String address, List<NameValuePair> heads) throws Exception {
return getDelete(address, METHOD_GET, heads);
return getDelete(DEFAULT_CONNECTTIMEOUT, DEFAULT_READTIMEOUT, address, METHOD_GET, heads);
}
public static ActionResponse delete(int connectTimeout, int readTimeout, String address, List<NameValuePair> heads)
throws Exception {
return getDelete(connectTimeout, readTimeout, address, METHOD_DELETE, heads);
}
public static ActionResponse delete(String address, List<NameValuePair> heads) throws Exception {
return getDelete(address, METHOD_DELETE, heads);
return getDelete(DEFAULT_CONNECTTIMEOUT, DEFAULT_READTIMEOUT, address, METHOD_DELETE, heads);
}
private static byte[] getDeleteBinary(String address, String method, List<NameValuePair> heads) throws Exception {
private static byte[] getDeleteBinary(int connectTimeout, int readTimeout, String address, String method,
List<NameValuePair> heads) throws Exception {
HttpURLConnection connection = null;
try {
URL url = new URL(address);
......@@ -106,6 +118,8 @@ public class ConnectionAction {
connection.setUseCaches(false);
connection.setDoOutput(false);
connection.setDoInput(true);
connection.setConnectTimeout(connectTimeout);
connection.setReadTimeout(readTimeout);
try {
connection.connect();
} catch (Exception e) {
......@@ -114,16 +128,26 @@ public class ConnectionAction {
return readBinary(connection);
}
public static byte[] getBinary(int connectTimeout, int readTimeout, String address, List<NameValuePair> heads)
throws Exception {
return getDeleteBinary(connectTimeout, readTimeout, address, METHOD_GET, heads);
}
public static byte[] getBinary(String address, List<NameValuePair> heads) throws Exception {
return getDeleteBinary(address, METHOD_GET, heads);
return getDeleteBinary(DEFAULT_CONNECTTIMEOUT, DEFAULT_READTIMEOUT, address, METHOD_GET, heads);
}
public static byte[] deleteBinary(int connectTimeout, int readTimeout, String address, List<NameValuePair> heads)
throws Exception {
return getDeleteBinary(connectTimeout, readTimeout, address, METHOD_DELETE, heads);
}
public static byte[] deleteBinary(String address, List<NameValuePair> heads) throws Exception {
return getDeleteBinary(address, METHOD_DELETE, heads);
return getDeleteBinary(DEFAULT_CONNECTTIMEOUT, DEFAULT_READTIMEOUT, address, METHOD_DELETE, heads);
}
public static ActionResponse postPut(String address, String method, List<NameValuePair> heads, Object body)
throws Exception {
private static ActionResponse postPut(int connectTimeout, int readTimeout, String address, String method,
List<NameValuePair> heads, Object body) throws Exception {
ActionResponse response = new ActionResponse();
HttpURLConnection connection = null;
try {
......@@ -140,6 +164,8 @@ public class ConnectionAction {
connection.setUseCaches(false);
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setConnectTimeout(connectTimeout);
connection.setReadTimeout(readTimeout);
try {
connection.connect();
} catch (Exception e) {
......@@ -165,16 +191,26 @@ public class ConnectionAction {
return read(response, connection);
}
public static ActionResponse post(int connectTimeout, int readTimeout, String address, List<NameValuePair> heads,
Object body) throws Exception {
return postPut(connectTimeout, readTimeout, address, METHOD_POST, heads, body);
}
public static ActionResponse post(String address, List<NameValuePair> heads, Object body) throws Exception {
return postPut(address, METHOD_POST, heads, body);
return postPut(DEFAULT_CONNECTTIMEOUT, DEFAULT_READTIMEOUT, address, METHOD_POST, heads, body);
}
public static ActionResponse put(int connectTimeout, int readTimeout, String address, List<NameValuePair> heads,
Object body) throws Exception {
return postPut(connectTimeout, readTimeout, address, METHOD_PUT, heads, body);
}
public static ActionResponse put(String address, List<NameValuePair> heads, Object body) throws Exception {
return postPut(address, METHOD_PUT, heads, body);
return postPut(DEFAULT_CONNECTTIMEOUT, DEFAULT_READTIMEOUT, address, METHOD_PUT, heads, body);
}
private static byte[] postPutBinary(String address, String method, List<NameValuePair> heads, Object body)
throws Exception {
private static byte[] postPutBinary(int connectTimeout, int readTimeout, String address, String method,
List<NameValuePair> heads, Object body) throws Exception {
HttpURLConnection connection = null;
try {
URL url = new URL(address);
......@@ -187,6 +223,8 @@ public class ConnectionAction {
connection.setUseCaches(false);
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setConnectTimeout(connectTimeout);
connection.setReadTimeout(readTimeout);
try {
connection.connect();
} catch (Exception e) {
......@@ -206,16 +244,27 @@ public class ConnectionAction {
return readBinary(connection);
}
public static byte[] postBinary(int connectTimeout, int readTimeout, String address, List<NameValuePair> heads,
Object body) throws Exception {
return postPutBinary(connectTimeout, readTimeout, address, METHOD_POST, heads, body);
}
public static byte[] postBinary(String address, List<NameValuePair> heads, Object body) throws Exception {
return postPutBinary(address, METHOD_POST, heads, body);
return postPutBinary(DEFAULT_CONNECTTIMEOUT, DEFAULT_READTIMEOUT, address, METHOD_POST, heads, body);
}
public static byte[] putBinary(int connectTimeout, int readTimeout, String address, List<NameValuePair> heads,
Object body) throws Exception {
return postPutBinary(connectTimeout, readTimeout, address, METHOD_PUT, heads, body);
}
public static byte[] putBinary(String address, List<NameValuePair> heads, Object body) throws Exception {
return postPutBinary(address, METHOD_PUT, heads, body);
return postPutBinary(DEFAULT_CONNECTTIMEOUT, DEFAULT_READTIMEOUT, address, METHOD_PUT, heads, body);
}
private static byte[] postPutMultiPartBinary(String address, String method, List<NameValuePair> heads,
Collection<FormField> formFields, Collection<FilePart> fileParts) throws Exception {
private static byte[] postPutMultiPartBinary(int connectTimeout, int readTimeout, String address, String method,
List<NameValuePair> heads, Collection<FormField> formFields, Collection<FilePart> fileParts)
throws Exception {
HttpURLConnection connection = null;
String boundary = StringTools.TWO_HYPHENS + StringTools.TWO_HYPHENS + System.currentTimeMillis();
try {
......@@ -248,6 +297,8 @@ public class ConnectionAction {
connection.setUseCaches(false);
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setConnectTimeout(connectTimeout);
connection.setReadTimeout(readTimeout);
try {
connection.connect();
} catch (Exception e) {
......@@ -261,14 +312,28 @@ public class ConnectionAction {
return readBinary(connection);
}
public static byte[] postMultiPartBinary(int connectTimeout, int readTimeout, String address,
List<NameValuePair> heads, Collection<FormField> formFields, Collection<FilePart> fileParts)
throws Exception {
return postPutMultiPartBinary(connectTimeout, readTimeout, address, METHOD_POST, heads, formFields, fileParts);
}
public static byte[] postMultiPartBinary(String address, List<NameValuePair> heads,
Collection<FormField> formFields, Collection<FilePart> fileParts) throws Exception {
return postPutMultiPartBinary(address, METHOD_POST, heads, formFields, fileParts);
return postPutMultiPartBinary(DEFAULT_CONNECTTIMEOUT, DEFAULT_READTIMEOUT, address, METHOD_POST, heads,
formFields, fileParts);
}
public static byte[] putMultiPartBinary(int connectTimeout, int readTimeout, String address,
List<NameValuePair> heads, Collection<FormField> formFields, Collection<FilePart> fileParts)
throws Exception {
return postPutMultiPartBinary(connectTimeout, readTimeout, address, METHOD_PUT, heads, formFields, fileParts);
}
public static byte[] putMultiPartBinary(String address, List<NameValuePair> heads, Collection<FormField> formFields,
Collection<FilePart> fileParts) throws Exception {
return postPutMultiPartBinary(address, METHOD_PUT, heads, formFields, fileParts);
return postPutMultiPartBinary(DEFAULT_CONNECTTIMEOUT, DEFAULT_READTIMEOUT, address, METHOD_PUT, heads,
formFields, fileParts);
}
private static void writeFormField(OutputStream output, FormField formField, String boundary) throws IOException {
......@@ -307,31 +372,31 @@ public class ConnectionAction {
IOUtils.write(StringTools.CRLF, output, StandardCharsets.UTF_8);
}
public static byte[] getFile(String address, List<NameValuePair> heads) throws Exception {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
HttpGet httpget = new HttpGet(address);
if (ListTools.isNotEmpty(heads)) {
String name;
String value;
for (NameValuePair o : heads) {
name = Objects.toString(o.getName(), "");
value = Objects.toString(o.getValue(), "");
if (StringUtils.isNotEmpty(name) && StringUtils.isNotEmpty(value)) {
httpget.addHeader(name, value);
}
}
}
HttpResponse response = httpclient.execute(httpget);
HttpEntity entity = response.getEntity();
if (entity != null) {
InputStream in = entity.getContent();
if (in != null) {
return IOUtils.toByteArray(in);
}
}
}
return null;
}
// public static byte[] getFile(String address, List<NameValuePair> heads) throws ClientProtocolException, IOException {
// try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
// HttpGet httpget = new HttpGet(address);
// if (ListTools.isNotEmpty(heads)) {
// String name;
// String value;
// for (NameValuePair o : heads) {
// name = Objects.toString(o.getName(), "");
// value = Objects.toString(o.getValue(), "");
// if (StringUtils.isNotEmpty(name) && StringUtils.isNotEmpty(value)) {
// httpget.addHeader(name, value);
// }
// }
// }
// HttpResponse response = httpclient.execute(httpget);
// HttpEntity entity = response.getEntity();
// if (entity != null) {
// InputStream in = entity.getContent();
// if (in != null) {
// return IOUtils.toByteArray(in);
// }
// }
// }
// return null;
// }
private static String extractErrorMessageIfExist(String str) {
if (StringUtils.isBlank(str)) {
......@@ -379,7 +444,7 @@ public class ConnectionAction {
return response;
}
private static byte[] readBinary(HttpURLConnection connection) throws Exception {
private static byte[] readBinary(HttpURLConnection connection) throws ExceptionReadBinary, IOException {
int code = connection.getResponseCode();
byte[] bytes = null;
if (code >= 500) {
......
......@@ -12,7 +12,6 @@ import com.x.base.core.project.jaxrs.echo.EchoAction;
import com.x.base.core.project.jaxrs.fireschedule.FireScheduleAction;
import com.x.base.core.project.jaxrs.logger.LoggerAction;
import com.x.base.core.project.jaxrs.sysresource.SysResourceAction;
import com.x.base.core.project.jaxrs.thread.ThreadAction;
public abstract class AbstractActionApplication extends Application {
protected Set<Object> singletons = new HashSet<>();
......@@ -24,7 +23,6 @@ public abstract class AbstractActionApplication extends Application {
classes.add(LoggerAction.class);
classes.add(FireScheduleAction.class);
classes.add(SysResourceAction.class);
classes.add(ThreadAction.class);
// providers
classes.add(MessageBodyReaderImpl.class);
classes.add(MultiPartFeature.class);
......
package com.x.base.core.project.jaxrs;
import javax.servlet.annotation.WebFilter;
@WebFilter(urlPatterns = { "/jaxrs/thread/*" }, asyncSupported = true)
public class ThreadJaxrsFilter extends CipherManagerJaxrsFilter {
}
package com.x.base.core.project.jaxrs.echo;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletContext;
......@@ -27,6 +29,8 @@ class ActionGet extends BaseAction {
public static class Wo extends GsonPropertyObject {
private static final long serialVersionUID = 3728516737526214370L;
@FieldDescribe("上下文根")
private String servletContextName;
......
package com.x.base.core.project.jaxrs.thread;
import javax.servlet.ServletContext;
import com.x.base.core.project.AbstractContext;
import com.x.base.core.project.http.ActionResult;
import com.x.base.core.project.http.EffectivePerson;
import com.x.base.core.project.jaxrs.WrapString;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
class ActionAlive extends BaseAction {
private static Logger logger = LoggerFactory.getLogger(ActionAlive.class);
ActionResult<Wo> execute(EffectivePerson effectivePerson, ServletContext servletContext, String name)
throws Exception {
ActionResult<Wo> result = new ActionResult<>();
AbstractContext ctx = com.x.base.core.project.Context.fromServletContext(servletContext);
Wo wo = new Wo();
wo.setValue(ctx.threadFactory().aliveLocal(name));
result.setData(wo);
return result;
}
public static class Wo extends WrapString {
private static final long serialVersionUID = 6928521934755900177L;
}
}
\ No newline at end of file
package com.x.base.core.project.jaxrs.thread;
import java.util.Map;
import java.util.TreeMap;
import javax.servlet.ServletContext;
import com.x.base.core.project.AbstractContext;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.http.ActionResult;
import com.x.base.core.project.http.EffectivePerson;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
class ActionParameter extends BaseAction {
private static Logger logger = LoggerFactory.getLogger(ActionParameter.class);
ActionResult<Wo> execute(EffectivePerson effectivePerson, ServletContext servletContext, String name)
throws Exception {
ActionResult<Wo> result = new ActionResult<>();
AbstractContext ctx = com.x.base.core.project.Context.fromServletContext(servletContext);
Wo wo = null;
Map<Object, Object> value = ctx.threadFactory().parameterLocal(name);
if (null != value) {
wo = XGsonBuilder.convert(value, Wo.class);
}
result.setData(wo);
return result;
}
public static class Wo extends TreeMap<Object, Object> {
private static final long serialVersionUID = 6928521934755900177L;
}
}
\ No newline at end of file
package com.x.base.core.project.jaxrs.thread;
import javax.servlet.ServletContext;
import com.x.base.core.project.AbstractContext;
import com.x.base.core.project.http.ActionResult;
import com.x.base.core.project.http.EffectivePerson;
import com.x.base.core.project.jaxrs.WrapBoolean;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
class ActionStop extends BaseAction {
private static Logger logger = LoggerFactory.getLogger(ActionStop.class);
ActionResult<Wo> execute(EffectivePerson effectivePerson, ServletContext servletContext, String name)
throws Exception {
ActionResult<Wo> result = new ActionResult<>();
AbstractContext ctx = com.x.base.core.project.Context.fromServletContext(servletContext);
Wo wo = new Wo();
wo.setValue(ctx.threadFactory().stopLocal(name));
result.setData(wo);
return result;
}
public static class Wo extends WrapBoolean {
private static final long serialVersionUID = 3426973660260791768L;
}
}
\ No newline at end of file
package com.x.base.core.project.jaxrs.thread;
import com.x.base.core.project.jaxrs.StandardJaxrsAction;
abstract class BaseAction extends StandardJaxrsAction {
}
package com.x.base.core.project.jaxrs.thread;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import com.x.base.core.project.annotation.JaxrsDescribe;
import com.x.base.core.project.annotation.JaxrsMethodDescribe;
import com.x.base.core.project.http.ActionResult;
import com.x.base.core.project.http.EffectivePerson;
import com.x.base.core.project.http.HttpMediaType;
import com.x.base.core.project.jaxrs.ResponseFactory;
import com.x.base.core.project.jaxrs.StandardJaxrsAction;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
@Path("thread")
@JaxrsDescribe("线程接口")
public class ThreadAction extends StandardJaxrsAction {
private static Logger logger = LoggerFactory.getLogger(ThreadAction.class);
@JaxrsMethodDescribe(value = "是否运行中.", action = ActionAlive.class)
@GET
@Produces(HttpMediaType.APPLICATION_JSON_UTF_8)
@Consumes(MediaType.APPLICATION_JSON)
@Path("alive/{name}")
public void alive(@Suspended final AsyncResponse asyncResponse, @Context HttpServletRequest request,
@Context ServletContext servletContext, @PathParam("name") String name) {
ActionResult<ActionAlive.Wo> result = new ActionResult<>();
EffectivePerson effectivePerson = this.effectivePerson(request);
try {
result = new ActionAlive().execute(effectivePerson, servletContext, name);
} catch (Exception e) {
logger.error(e, effectivePerson, request, null);
result.error(e);
}
asyncResponse.resume(ResponseFactory.getEntityTagActionResultResponse(request, result));
}
@JaxrsMethodDescribe(value = "获取参数.", action = ActionParameter.class)
@GET
@Produces(HttpMediaType.APPLICATION_JSON_UTF_8)
@Consumes(MediaType.APPLICATION_JSON)
@Path("parameter/{name}")
public void parameter(@Suspended final AsyncResponse asyncResponse, @Context HttpServletRequest request,
@Context ServletContext servletContext, @PathParam("name") String name) {
ActionResult<ActionParameter.Wo> result = new ActionResult<>();
EffectivePerson effectivePerson = this.effectivePerson(request);
try {
result = new ActionParameter().execute(effectivePerson, servletContext, name);
} catch (Exception e) {
logger.error(e, effectivePerson, request, null);
result.error(e);
}
asyncResponse.resume(ResponseFactory.getEntityTagActionResultResponse(request, result));
}
@JaxrsMethodDescribe(value = "停止运行.", action = ActionStop.class)
@GET
@Produces(HttpMediaType.APPLICATION_JSON_UTF_8)
@Consumes(MediaType.APPLICATION_JSON)
@Path("stop/{name}")
public void stop(@Suspended final AsyncResponse asyncResponse, @Context HttpServletRequest request,
@Context ServletContext servletContext, @PathParam("name") String name) {
ActionResult<ActionStop.Wo> result = new ActionResult<>();
EffectivePerson effectivePerson = this.effectivePerson(request);
try {
result = new ActionStop().execute(effectivePerson, servletContext, name);
} catch (Exception e) {
logger.error(e, effectivePerson, request, null);
result.error(e);
}
asyncResponse.resume(ResponseFactory.getEntityTagActionResultResponse(request, result));
}
}
\ No newline at end of file
package com.x.base.core.project.queue;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.gson.Gson;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
......@@ -13,82 +9,73 @@ public abstract class AbstractQueue<T> {
private static Logger logger = LoggerFactory.getLogger(AbstractQueue.class);
private static Gson gson = XGsonBuilder.instance();
private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private volatile boolean turn = false;
private volatile boolean turn = true;
private String className = this.getClass().getName();
private List<Object> executings = new CopyOnWriteArrayList<>();
// private List<Object> executings = new CopyOnWriteArrayList<>();
public boolean executing(T t) {
if (null == t) {
return false;
} else {
return executings.contains(t);
}
}
// public boolean executing(T t) {
// if (null == t) {
// return false;
// } else {
// return executings.contains(t);
// }
// }
public boolean contains(T t) {
if (null == t) {
return false;
}
if (this.queue.contains(t)) {
return true;
}
return this.executing(t);
return this.queue.contains(t);
// return false;
// return this.executing(t);
}
/**
* 标识自己,将会传入到执行线程中使用
*/
private AbstractQueue<T> abstractQueue = this;
/**
* 将创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
*/
// private ExecutorService executorService = null;
public void send(T t) throws Exception {
queue.put(t);
if (null != t) {
queue.put(t);
}
}
public void start() {
if (turn) {
return;
}
turn = true;
new Thread() {
// if (turn) {
// return;
// }
// turn = true;
new Thread(className) {
@SuppressWarnings("unchecked")
@Override
public void run() {
Object o = null;
while (turn) {
try {
o = queue.take();
if (null != o) {
executings.add(o);
if (o instanceof StopSignal) {
turn = false;
break;
}
execute((T) o);
// if (null != o) {
// executings.add(o);
if (o instanceof StopSignal) {
turn = false;
break;
}
execute((T) o);
// }
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != o) {
executings.remove(o);
}
} catch (Exception e) {
e.printStackTrace();
}
}
// } finally {
// try {
// if (null != o) {
// executings.remove(o);
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
}
}
}.start();
logger.info("queue class: {} start.", className);
}
......@@ -109,5 +96,6 @@ public abstract class AbstractQueue<T> {
}
private static class StopSignal {
// nothing
}
}
package com.x.server.console;
import org.apache.commons.lang3.BooleanUtils;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.server.console.node.RefreshApplicationsEvent;
import com.x.server.console.node.RegistApplicationsEvent;
import com.x.server.console.node.UpdateApplicationsEvent;
import com.x.server.console.node.VoteCenterEvent;
import com.x.server.console.server.Servers;
import org.apache.commons.lang3.BooleanUtils;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
public class RegistApplicationsAndVoteCenterTask implements Job {
private static Logger logger = LoggerFactory.getLogger(RegistApplicationsAndVoteCenterTask.class);
......@@ -21,17 +22,20 @@ public class RegistApplicationsAndVoteCenterTask implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
// 先选举center
Config.resource_node_eventQueue().put(XGsonBuilder.instance().toJsonTree(new VoteCenterEvent()));
if (BooleanUtils.isTrue(Servers.applicationServerIsRunning())) {
// 刷新本地application
Config.resource_node_eventQueue().put(XGsonBuilder.instance().toJsonTree(new RefreshApplicationsEvent()));
if (BooleanUtils.isTrue(Servers.applicationServerIsStarted())) {
// 先选举center
Config.resource_node_eventQueue().put(XGsonBuilder.instance().toJsonTree(new VoteCenterEvent()));
// 注册node上所有的appliction到各个center
Config.resource_node_eventQueue()
.put(XGsonBuilder.instance().toJsonTree(new RegistApplicationsEvent()));
} else {
Config.resource_node_eventQueue()
.put(XGsonBuilder.instance().toJsonTree(new UpdateApplicationsEvent()));
}
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
Thread.currentThread().interrupt();
}
}
}
\ No newline at end of file
......@@ -19,6 +19,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.jetty.plus.jndi.Resource;
import org.eclipse.jetty.util.RolloverFileOutputStream;
......@@ -209,7 +210,8 @@ public class ResourceFactory {
private static void processPlatformExecutors() throws Exception {
ExecutorService[] services = new ExecutorService[Config.processPlatform().getExecutorCount()];
for (int i = 0; i < Config.processPlatform().getExecutorCount(); i++) {
services[i] = Executors.newFixedThreadPool(1);
services[i] = Executors.newFixedThreadPool(1, new BasicThreadFactory.Builder()
.namingPattern("ProcessPlatformExecutor-" + i).daemon(true).build());
}
new Resource(Config.RESOURCE_NODE_PROCESSPLATFORMEXECUTORS, services);
......
//package com.x.server.console;
//
//import com.x.base.core.project.config.Config;
//import com.x.base.core.project.logger.Logger;
//import com.x.base.core.project.logger.LoggerFactory;
//import com.x.server.console.action.RestoreStorage;
//
//import org.quartz.Job;
//import org.quartz.JobExecutionContext;
//import org.quartz.JobExecutionException;
//
//public class RestoreStorageTask implements Job {
//
// private static Logger logger = LoggerFactory.getLogger(RestoreStorageTask.class);
//
// @Override
// public void execute(JobExecutionContext arg0) throws JobExecutionException {
// try {
// logger.print("schedule restore storage task start, restore from:{}.",
// Config.currentNode().restoreData().path());
// RestoreStorage action = new RestoreStorage();
// action.execute(Config.currentNode().restoreStorage().path());
// } catch (Exception e) {
// throw new JobExecutionException(e);
// }
//
// }
//
//}
\ No newline at end of file
......@@ -2,10 +2,12 @@ package com.x.server.console;
import java.util.Properties;
import org.apache.commons.lang3.BooleanUtils;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
......@@ -21,44 +23,37 @@ public class SchedulerBuilder {
Scheduler scheduler = stdSchedulerFactory.getScheduler();
scheduler.start();
if (Config.currentNode().dumpData().enable() && Config.currentNode().dumpData().available()) {
JobDetail jobDetail = JobBuilder.newJob(DumpDataTask.class)
.withIdentity(DumpDataTask.class.getName(), scheduleGroup).withDescription(Config.node()).build();
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(DumpDataTask.class.getName(), scheduleGroup)
.withSchedule(CronScheduleBuilder.cronSchedule(Config.currentNode().dumpData().cron())).build();
scheduler.scheduleJob(jobDetail, trigger);
if (BooleanUtils.isTrue(Config.currentNode().dumpData().enable())
&& Config.currentNode().dumpData().available()) {
dumpDataTask(scheduleGroup, scheduler);
}
// if (Config.currentNode().dumpStorage().enable() && Config.currentNode().dumpStorage().available()) {
// JobDetail jobDetail = JobBuilder.newJob(DumpStorageTask.class)
// .withIdentity(DumpStorageTask.class.getName(), scheduleGroup).withDescription(Config.node())
// .build();
// Trigger trigger = TriggerBuilder.newTrigger().withIdentity(DumpStorageTask.class.getName(), scheduleGroup)
// .withSchedule(CronScheduleBuilder.cronSchedule(Config.currentNode().dumpStorage().cron())).build();
// scheduler.scheduleJob(jobDetail, trigger);
// }
if (Config.currentNode().restoreData().enable() && Config.currentNode().restoreData().available()) {
JobDetail jobDetail = JobBuilder.newJob(RestoreDataTask.class)
.withIdentity(RestoreDataTask.class.getName(), scheduleGroup).withDescription(Config.node())
.build();
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(RestoreDataTask.class.getName(), scheduleGroup)
.withSchedule(CronScheduleBuilder.cronSchedule(Config.currentNode().restoreData().cron())).build();
scheduler.scheduleJob(jobDetail, trigger);
if (BooleanUtils.isTrue(Config.currentNode().restoreData().enable())
&& Config.currentNode().restoreData().available()) {
restoreDataTask(scheduleGroup, scheduler);
}
// if (Config.currentNode().restoreStorage().enable() && Config.currentNode().restoreStorage().available()) {
// JobDetail jobDetail = JobBuilder.newJob(RestoreStorageTask.class)
// .withIdentity(RestoreStorageTask.class.getName(), scheduleGroup).withDescription(Config.node())
// .build();
// Trigger trigger = TriggerBuilder.newTrigger()
// .withIdentity(RestoreStorageTask.class.getName(), scheduleGroup)
// .withSchedule(CronScheduleBuilder.cronSchedule(Config.currentNode().restoreStorage().cron()))
// .build();
// scheduler.scheduleJob(jobDetail, trigger);
// }
this.registApplicationsAndVoteCenterTask(scheduler, scheduleGroup);
return scheduler;
}
private void restoreDataTask(String scheduleGroup, Scheduler scheduler) throws Exception {
JobDetail jobDetail = JobBuilder.newJob(RestoreDataTask.class)
.withIdentity(RestoreDataTask.class.getName(), scheduleGroup).withDescription(Config.node()).build();
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(RestoreDataTask.class.getName(), scheduleGroup)
.withSchedule(CronScheduleBuilder.cronSchedule(Config.currentNode().restoreData().cron())).build();
scheduler.scheduleJob(jobDetail, trigger);
}
private void dumpDataTask(String scheduleGroup, Scheduler scheduler) throws Exception {
JobDetail jobDetail = JobBuilder.newJob(DumpDataTask.class)
.withIdentity(DumpDataTask.class.getName(), scheduleGroup).withDescription(Config.node()).build();
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(DumpDataTask.class.getName(), scheduleGroup)
.withSchedule(CronScheduleBuilder.cronSchedule(Config.currentNode().dumpData().cron())).build();
scheduler.scheduleJob(jobDetail, trigger);
}
/* 更新node节点applications 和 选择center主节点 */
private void registApplicationsAndVoteCenterTask(Scheduler scheduler, String scheduleGroup) throws Exception {
JobDetail jobDetail = JobBuilder.newJob(RegistApplicationsAndVoteCenterTask.class)
......@@ -66,7 +61,7 @@ public class SchedulerBuilder {
.withDescription(Config.node()).build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(RegistApplicationsAndVoteCenterTask.class.getName(), scheduleGroup)
.withSchedule(CronScheduleBuilder.cronSchedule("0/15 * * * * ?")).build();
.withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?")).build();
scheduler.scheduleJob(jobDetail, trigger);
}
......@@ -77,8 +72,7 @@ public class SchedulerBuilder {
properties.setProperty("org.quartz.scheduler.rmi.proxy", "false");
properties.setProperty("org.quartz.scheduler.wrapJobExecutionInUserTransaction", "false");
properties.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
// properties.setProperty("org.quartz.threadPool.threadCount", "5");
properties.setProperty("org.quartz.threadPool.threadCount", "50");
properties.setProperty("org.quartz.threadPool.threadCount", "10");
properties.setProperty("org.quartz.threadPool.threadPriority", "5");
properties.setProperty("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", "true");
properties.setProperty("org.quartz.jobStore.misfireThreshold", "60000");
......
......@@ -2,7 +2,7 @@ package com.x.server.console.node;
public interface Event {
public static final String TYPE_REGISTAPPLICATION = "registApplication";
public static final String TYPE_REFRESHAPPLICATIONS = "refreshApplications";
public static final String TYPE_REGISTAPPLICATIONS = "registApplications";
......@@ -10,6 +10,6 @@ public interface Event {
public static final String TYPE_VOTECENTER = "voteCenter";
public abstract void execute() throws Exception;
public abstract void execute();
}
......@@ -18,15 +18,20 @@ public class EventQueueExecutor extends Thread {
this.queue = queue;
}
@Override
public void run() {
while (true) {
Event event = null;
try {
JsonElement jsonElement = queue.take();
Event event = convert(jsonElement);
event.execute();
event = convert(jsonElement);
} catch (Exception e) {
logger.error(e);
}
if (null != event) {
event.execute();
}
}
}
......@@ -36,8 +41,8 @@ public class EventQueueExecutor extends Thread {
switch (type) {
case Event.TYPE_REGISTAPPLICATION:
return XGsonBuilder.instance().fromJson(jsonElement, RegistApplicationEvent.class);
case Event.TYPE_REFRESHAPPLICATIONS:
return XGsonBuilder.instance().fromJson(jsonElement, RefreshApplicationsEvent.class);
case Event.TYPE_REGISTAPPLICATIONS:
return XGsonBuilder.instance().fromJson(jsonElement, RegistApplicationsEvent.class);
......
package com.x.server.console.node;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;
import com.google.gson.Gson;
import com.x.base.core.project.Application;
import com.x.base.core.project.Applications;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.tools.DateTools;
import com.x.base.core.project.tools.ListTools;
public class RefreshApplicationsEvent implements Event {
private static Logger logger = LoggerFactory.getLogger(RefreshApplicationsEvent.class);
private static final Gson gson = XGsonBuilder.instance();
public final String type = Event.TYPE_REFRESHAPPLICATIONS;
public void execute() {
try {
if (null != Config.resource_node_applications()) {
Applications applications = applications();
if (null != applications) {
Date now = new Date();
boolean modify = this.refresh(applications, now);
if (modify) {
Config.resource_node_applications(gson.toJsonTree(applications));
}
Config.resource_node_applicationsTimestamp(now);
}
}
} catch (Exception e) {
logger.error(e);
}
}
private boolean refresh(Applications applications, Date date) throws Exception {
List<String> removeEntries = new ArrayList<>();
boolean modify = false;
for (Entry<String, CopyOnWriteArrayList<Application>> en : applications.entrySet()) {
List<Application> removeApplications = new ArrayList<>();
for (Application application : en.getValue()) {
long diffence = Math.abs((date.getTime() - application.getReportDate().getTime()));
if (diffence > (10 * 2 * 1000) + 5000) {
removeApplications.add(application);
logger.warn("cluster dropped application: {}, node: {}, report date: {}.", en.getKey(),
application.getNode(), DateTools.format(application.getReportDate()));
}
}
modify = en.getValue().removeAll(removeApplications) || modify;
if (en.getValue().isEmpty()) {
removeEntries.add(en.getKey());
}
}
if (ListTools.isNotEmpty(removeEntries)) {
modify = true;
for (String str : removeEntries) {
applications().remove(str);
}
}
return modify;
}
private Applications applications() throws Exception {
return gson.fromJson(Config.resource_node_applications(), Applications.class);
}
}
\ No newline at end of file
package com.x.server.console.node;
import java.util.Date;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.lang3.StringUtils;
import com.x.base.core.project.Application;
import com.x.base.core.project.Applications;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.gson.XGsonBuilder;
public class RegistApplicationEvent extends Application implements Event {
public final String type = Event.TYPE_REGISTAPPLICATION;
public void execute() throws Exception {
Applications applications = null;
if (null != Config.resource_node_applications()) {
applications = XGsonBuilder.instance().fromJson(Config.resource_node_applications(), Applications.class);
CopyOnWriteArrayList<Application> list = applications.get(this.getClassName());
if (null == list) {
list = new CopyOnWriteArrayList<Application>();
applications.put(this.getClassName(), list);
this.update(applications, list);
} else if (list.stream().filter(o -> {
return StringUtils.equals(o.getNode(), this.getNode());
}).count() == 0) {
this.update(applications, list);
}
} else {
applications = new Applications();
CopyOnWriteArrayList<Application> list = new CopyOnWriteArrayList<>();
applications.put(this.getClassName(), list);
this.update(applications, list);
}
}
private void update(Applications applications, CopyOnWriteArrayList<Application> list) throws Exception {
Application application = XGsonBuilder.convert(this, Application.class);
list.add(application);
Config.resource_node_applications(XGsonBuilder.instance().toJsonTree(applications));
Config.resource_node_applicationsTimestamp(new Date());
Config.resource_node_eventQueue().put(XGsonBuilder.instance().toJsonTree(new RegistApplicationsEvent()));
}
}
\ No newline at end of file
package com.x.server.console.node;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.jetty.quickstart.QuickStartWebApp;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import com.google.gson.Gson;
import com.x.base.core.project.AbstractContext;
import com.x.base.core.project.Application;
import com.x.base.core.project.Applications;
import com.x.base.core.project.annotation.FieldDescribe;
import com.x.base.core.project.config.CenterServer;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.connection.CipherConnectionAction;
import com.x.base.core.project.exception.RunningException;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.jaxrs.WrapString;
import com.x.base.core.project.tools.ListTools;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.server.console.server.Servers;
public class RegistApplicationsEvent implements Event {
private static Logger logger = LoggerFactory.getLogger(RegistApplicationsEvent.class);
private static final Gson gson = XGsonBuilder.instance();
public final String type = Event.TYPE_REGISTAPPLICATIONS;
public void execute() throws Exception {
// private AtomicInteger loop = new AtomicInteger(0);
public void execute() {
try {
if (BooleanUtils.isTrue(Servers.applicationServerIsStarted())
&& (null != Config.resource_node_applications())) {
List<Application> list = listApplication();
// list = removeNotRegisted(list);
if (BooleanUtils.isTrue(Config.currentNode().getSelfHealthCheckEnable()) && (!this.healthCheck(list))) {
logger.warn("health check result is false.");
list.clear();
}
if (null != Config.resource_node_applications()) {
// if (list.isEmpty() || 0 == loop.getAndUpdate(o -> (++o % 2))) {
Applications applications = XGsonBuilder.instance().fromJson(Config.resource_node_applications(),
Applications.class);
Req req = new Req();
List<Application> list = new ArrayList<>();
req.setNode(Config.node());
out: for (List<Application> o : applications.values()) {
for (Application application : o) {
if (StringUtils.equals(Config.node(), application.getNode())) {
list.add(application);
continue out;
}
req.setValue(gson.toJson(list));
for (Entry<String, CenterServer> entry : Config.nodes().centerServers().orderedEntry()) {
CipherConnectionAction.put(false, 1000, 2000,
Config.url_x_program_center_jaxrs(entry, "center", "regist", "applications"), req);
}
// }
}
} catch (Exception e) {
logger.error(e);
}
}
if (Servers.applicationServerIsRunning()) {
List<String> contextPaths = ListTools.extractProperty(list, "contextPath", String.class, true, true);
List<String> removes = new ArrayList<>();
GzipHandler gzipHandler = (GzipHandler) Servers.applicationServer.getHandler();
HandlerList hanlderList = (HandlerList) gzipHandler.getHandler();
for (Handler handler : hanlderList.getHandlers()) {
if (QuickStartWebApp.class.isAssignableFrom(handler.getClass())) {
QuickStartWebApp app = (QuickStartWebApp) handler;
if (!contextPaths.contains(app.getContextPath())) {
removes.add(app.getContextPath());
}
}
}
if (!removes.isEmpty()) {
list = list.stream().filter(o -> {
return !removes.contains(o.getContextPath());
}).collect(Collectors.toList());
// private List<Application> removeNotRegisted(List<Application> list) throws Exception {
// List<String> contextPaths = ListTools.extractProperty(list, "contextPath", String.class, true, true);
// List<String> removes = new ArrayList<>();
// GzipHandler gzipHandler = (GzipHandler) Servers.applicationServer.getHandler();
// HandlerList hanlderList = (HandlerList) gzipHandler.getHandler();
// for (Handler handler : hanlderList.getHandlers()) {
// if (QuickStartWebApp.class.isAssignableFrom(handler.getClass())) {
// QuickStartWebApp app = (QuickStartWebApp) handler;
// if ((!contextPaths.contains(app.getContextPath())) || (!app.isStarted())) {
// removes.add(app.getContextPath());
// }
// }
// }
// if (!removes.isEmpty()) {
// list = list.stream().filter(o -> !removes.contains(o.getContextPath())).collect(Collectors.toList());
// }
// return list;
// }
// 保证从已经regist中取出,否则可能在启动阶段即被访问
// private List<Application> listRegistedApplication() throws Exception {
// Applications applications = gson.fromJson(Config.resource_node_applications(), Applications.class);
// List<Application> list = new ArrayList<>();
// final String node = Config.node();
// for (List<Application> o : applications.values()) {
// for (Application application : o) {
// if (StringUtils.equals(node, application.getNode())) {
// list.add(application);
// }
// }
// }
// return list;
// }
private List<Application> listApplication() throws Exception {
List<Application> list = new ArrayList<>();
GzipHandler gzipHandler = (GzipHandler) Servers.applicationServer.getHandler();
HandlerList hanlderList = (HandlerList) gzipHandler.getHandler();
for (Handler handler : hanlderList.getHandlers()) {
if (QuickStartWebApp.class.isAssignableFrom(handler.getClass())) {
QuickStartWebApp app = (QuickStartWebApp) handler;
if (app.isStarted()) {
list.add(gson.fromJson(app.getServletContext()
.getAttribute(AbstractContext.SERVLETCONTEXT_ATTRIBUTE_APPLICATION).toString(),
Application.class));
}
}
}
return list;
}
private boolean healthCheck(List<Application> list) {
List<CompletableFuture<Long>> futures = new ArrayList<>();
try {
for (Application o : list) {
futures.add(healthCheckTask(o));
}
long min = 0;
long max = 0;
for (CompletableFuture<Long> future : futures) {
long difference = future.get(3000, TimeUnit.MILLISECONDS);
min = Math.min(min, difference);
max = Math.max(max, difference);
}
if (max > 30 * 1000) {
logger.warn("server time difference is too large: {}ms.", max);
}
if (min < 0) {
return false;
}
} catch (Exception e) {
logger.error(new RunningException(e, "health check error."));
Thread.currentThread().interrupt();
return false;
}
return true;
}
private CompletableFuture<Long> healthCheckTask(Application application) {
return CompletableFuture.supplyAsync(() -> {
try {
Resp resp = CipherConnectionAction.get(false, 1000, 1000, application, "echo").getData(Resp.class);
Date date = resp.getServerTime();
return Math.abs(date.getTime() - ((new Date()).getTime()));
} catch (Exception e) {
logger.error(new RunningException(e, "health check failure:{},{}.", application.getNode(),
application.getContextPath()));
}
return -1L;
}, Inner.executorService);
}
Req req = new Req();
private static class Inner {
private static final ExecutorService executorService = Executors.newFixedThreadPool(5,
new BasicThreadFactory.Builder().namingPattern("RegistApplicationsEvent-healthCheck-%d").daemon(true)
.build());
}
req.setValue(XGsonBuilder.toJson(list));
public static class Resp {
for (Entry<String, CenterServer> entry : Config.nodes().centerServers().orderedEntry()) {
CipherConnectionAction.put(false,
Config.url_x_program_center_jaxrs(entry, "center", "regist", "applications"), req);
@FieldDescribe("上下文根")
private String servletContextName;
}
@FieldDescribe("服务器时间")
private Date serverTime;
public String getServletContextName() {
return servletContextName;
}
public void setServletContextName(String servletContextName) {
this.servletContextName = servletContextName;
}
public Date getServerTime() {
return serverTime;
}
Config.resource_node_eventQueue().put(XGsonBuilder.instance().toJsonTree(new UpdateApplicationsEvent()));
public void setServerTime(Date serverTime) {
this.serverTime = serverTime;
}
}
public static class Req extends WrapString {
private static final long serialVersionUID = -2855209663719641934L;
private String node;
public String getNode() {
return node;
}
public void setNode(String node) {
this.node = node;
}
}
}
\ No newline at end of file
package com.x.server.console.node;
import org.apache.commons.lang3.BooleanUtils;
import com.x.base.core.project.Applications;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.connection.ActionResponse;
import com.x.base.core.project.connection.CipherConnectionAction;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.server.console.server.Servers;
public class UpdateApplicationsEvent implements Event {
private static Logger logger = LoggerFactory.getLogger(UpdateApplicationsEvent.class);
public final String type = Event.TYPE_UPDATEAPPLICATIONS;
public void execute() throws Exception {
if (Servers.applicationServerIsRunning()) {
ActionResponse respone = CipherConnectionAction.get(false,
Config.url_x_program_center_jaxrs("center", "applications"));
Applications applications = respone.getData(Applications.class);
Config.resource_node_applications(XGsonBuilder.instance().toJsonTree(applications));
public void execute() {
try {
if (BooleanUtils.isTrue(Servers.applicationServerIsRunning())) {
ActionResponse respone = CipherConnectionAction.get(false, 1000, 2000,
Config.url_x_program_center_jaxrs("center", "applications"));
Applications applications = respone.getData(Applications.class);
Config.resource_node_applications(XGsonBuilder.instance().toJsonTree(applications));
}
} catch (Exception e) {
logger.error(e);
}
}
}
......@@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
......@@ -12,6 +13,7 @@ import com.x.base.core.project.config.CenterServer;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.connection.ActionResponse;
import com.x.base.core.project.connection.CipherConnectionAction;
import com.x.base.core.project.exception.RunningException;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
......@@ -21,14 +23,13 @@ public class VoteCenterEvent implements Event {
public final String type = Event.TYPE_VOTECENTER;
public void execute() throws Exception {
List<Entry<String, CenterServer>> list = Config.nodes().centerServers().orderedEntry();
private AtomicInteger loop = new AtomicInteger(0);
public void execute() {
List<Entry<String, CenterServer>> list = listCenterServer();
for (Entry<String, CenterServer> entry : list) {
try {
ActionResponse response = CipherConnectionAction.get(false,
ActionResponse response = CipherConnectionAction.get(false, 1000, 2000,
Config.url_x_program_center_jaxrs(entry, "echo"));
JsonElement jsonElement = response.getData(JsonElement.class);
if (null != jsonElement && (!jsonElement.isJsonNull())) {
......@@ -44,12 +45,20 @@ public class VoteCenterEvent implements Event {
return;
}
} catch (Exception e) {
// logger.warn("failed to connect center: {}, port: {}, sslEnable: {}.",
// entry.getKey(),
// entry.getValue().getPort(), entry.getValue().getSslEnable());
if (loop.getAndUpdate(o -> ++o % 10) == 0) {
logger.warn("vote center error:{}, message:{}.", entry.getKey(), e.getMessage());
}
}
}
}
private List<Entry<String, CenterServer>> listCenterServer() {
try {
return Config.nodes().centerServers().orderedEntry();
} catch (Exception e) {
logger.error(new RunningException(e, "list center server:{}."));
}
return new ArrayList<>();
}
private String nodes(List<Entry<String, CenterServer>> list) {
......
......@@ -24,6 +24,27 @@ public class Servers {
public static FtpServer storageServer;
public static DataTcpWebServer dataServer;
public static Boolean webServerIsStarted() {
if (null == webServer) {
return false;
}
return webServer.isStarted();
}
public static Boolean applicationServerIsStarted() {
if (null == applicationServer) {
return false;
}
return applicationServer.isStarted();
}
public static Boolean centerServerIsStarted() {
if (null == centerServer) {
return false;
}
return centerServer.isStarted();
}
public static Boolean webServerIsRunning() {
if (null == webServer) {
return false;
......
package com.x.processplatform.service.processing.jaxrs.test;
import java.util.concurrent.Callable;
import com.x.base.core.project.executor.ProcessPlatformExecutorFactory;
import com.x.base.core.project.gson.GsonPropertyObject;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.http.ActionResult;
import com.x.base.core.project.http.EffectivePerson;
import com.x.base.core.project.jaxrs.WrapString;
import com.x.processplatform.service.processing.ThisApplication;
class ActionTest extends BaseAction {
ActionResult<Wo> execute(EffectivePerson effectivePerson, String job) throws Exception {
Callable<ActionResult<Wo>> callable = () -> {
ActionResult<Wo> result = new ActionResult<>();
Wo wo = new Wo();
result.setData(wo);
System.err.println("!!!!!!!!!!!!!!!!!!!start sleep " + ProcessPlatformExecutorFactory.get(job));
Thread.sleep(30000);
System.err.println("!!!!!!!!!!!!!!!!!!!start completed " + ProcessPlatformExecutorFactory.get(job));
return result;
};
return ProcessPlatformExecutorFactory.get(job).submit(callable).get();
ActionResult<Wo> execute(EffectivePerson effectivePerson) throws Exception {
ActionResult<Wo> result = new ActionResult<>();
Wo wo = new Wo();
wo.setValue(XGsonBuilder.toJson(ThisApplication.context().applications()));
result.setData(wo);
return result;
}
public static class Wo extends GsonPropertyObject {
public static class Wo extends WrapString {
private static final long serialVersionUID = 1L;
......
......@@ -4,7 +4,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
......@@ -163,17 +162,16 @@ public class TestAction extends StandardJaxrsAction {
asyncResponse.resume(ResponseFactory.getEntityTagActionResultResponse(request, result));
}
@JaxrsMethodDescribe(value = "测试ExecutorStatus.", action = ActionTest.class)
@JaxrsMethodDescribe(value = "测试test.", action = ActionTest.class)
@GET
@Path("test/{job}")
@Path("test")
@Produces(HttpMediaType.APPLICATION_JSON_UTF_8)
@Consumes(MediaType.APPLICATION_JSON)
public void urge(@Suspended final AsyncResponse asyncResponse, @Context HttpServletRequest request,
@PathParam("job") String job) {
public void test(@Suspended final AsyncResponse asyncResponse, @Context HttpServletRequest request) {
ActionResult<ActionTest.Wo> result = new ActionResult<>();
EffectivePerson effectivePerson = this.effectivePerson(request);
try {
result = new ActionTest().execute(effectivePerson, job);
result = new ActionTest().execute(effectivePerson);
} catch (Exception e) {
logger.error(e, effectivePerson, request, null);
result.error(e);
......
package com.x.program.center;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.lang3.StringUtils;
import com.x.base.core.project.Application;
import com.x.base.core.project.Applications;
import com.x.base.core.project.config.Config;
......@@ -13,82 +15,75 @@ import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.queue.AbstractQueue;
import com.x.base.core.project.tools.DateTools;
import com.x.base.core.project.tools.ListTools;
public class CenterQueue extends AbstractQueue<CenterQueueBody> {
private static Logger logger = LoggerFactory.getLogger(CenterQueue.class);
//public static final int REFRESHAPPLICATIONSINTERVAL = 45;
public static final int REFRESHAPPLICATIONSINTERVAL = 30;
protected void execute(CenterQueueBody body) throws Exception {
switch (body.type()) {
case CenterQueueBody.TYPE_REGISTAPPLICATIONS:
CenterQueueRegistApplicationsBody centerQueueRegistApplicationsBody = (CenterQueueRegistApplicationsBody) body;
registApplications(centerQueueRegistApplicationsBody);
break;
case CenterQueueBody.TYPE_REFRESHAPPLICATION:
this.refresh((CenterQueueRefreshBody) body);
break;
default:
break;
}
// switch (body.type()) {
// case CenterQueueBody.TYPE_REGISTAPPLICATIONS:
// CenterQueueRegistApplicationsBody centerQueueRegistApplicationsBody = (CenterQueueRegistApplicationsBody) body;
// registApplications(centerQueueRegistApplicationsBody);
// break;
//// case CenterQueueBody.TYPE_REFRESHAPPLICATION:
//// this.refresh((CenterQueueRefreshBody) body);
//// break;
// default:
// break;
if (StringUtils.equals(body.type(), CenterQueueBody.TYPE_REGISTAPPLICATIONS)) {
CenterQueueRegistApplicationsBody centerQueueRegistApplicationsBody = (CenterQueueRegistApplicationsBody) body;
registApplications(centerQueueRegistApplicationsBody);
}
}
private void registApplications(CenterQueueRegistApplicationsBody centerQueueRegistApplicationsBody)
throws Exception {
Applications applications = ThisApplication.context().applications();
Date now = new Date();
for (Application body : centerQueueRegistApplicationsBody) {
Application application = applications.get(body.getClassName(), body.getNode());
if (null != application) {
application.setReportDate(now);
} else {
if (ListTools.isNotEmpty(applications.get(body.getClassName()))) {
logger.print("cluster add application: {}, node: {}.", body.getNode(), body.getClassName());
if (centerQueueRegistApplicationsBody.isEmpty()
&& StringUtils.isNotEmpty(centerQueueRegistApplicationsBody.getNode())) {
this.clearNode(applications, centerQueueRegistApplicationsBody.getNode());
} else {
for (Application body : centerQueueRegistApplicationsBody) {
Application application = applications.get(body.getClassName(), body.getNode());
if (null != application) {
application.setReportDate(now);
} else {
if (ListTools.isNotEmpty(applications.get(body.getClassName()))) {
logger.print("cluster add application: {}, node: {}.", body.getNode(), body.getClassName());
}
body.setReportDate(now);
applications.add(body.getClassName(), body);
applications.updateTimestamp(now);
}
body.setReportDate(now);
applications.add(body.getClassName(), body);
Config.resource_node_applicationsTimestamp(now);
applications.updateTimestamp(now);
}
}
Config.resource_node_applicationsTimestamp(now);
Config.resource_node_applications(XGsonBuilder.instance().toJsonTree(applications));
}
private void refresh(CenterQueueRefreshBody body) throws Exception {
Applications applications = ThisApplication.context().applications();
Date now = new Date();
List<String> removeEntries = new ArrayList<>();
boolean modify = false;
for (Entry<String, CopyOnWriteArrayList<Application>> en : applications.entrySet()) {
List<Application> removeApplications = new ArrayList<>();
for (Application application : en.getValue()) {
if ((now.getTime() - application.getReportDate().getTime()) > REFRESHAPPLICATIONSINTERVAL * 2 * 1000) {
private void clearNode(Applications applications, String node) {
Set<String> removeEntries = new HashSet<>();
for (Entry<String, CopyOnWriteArrayList<Application>> entry : applications.entrySet()) {
Set<Application> removeApplications = new HashSet<>();
for (Application application : entry.getValue()) {
if (StringUtils.equals(application.getNode(), node)) {
removeApplications.add(application);
logger.warn("cluster dropped application: {}, node: {}, report date: {}.", en.getKey(),
application.getNode(), DateTools.format(application.getReportDate()));
}
}
modify = en.getValue().removeAll(removeApplications) || modify;
if (en.getValue().isEmpty()) {
removeEntries.add(en.getKey());
entry.getValue().removeAll(removeApplications);
if (entry.getValue().isEmpty()) {
removeEntries.add(entry.getKey());
}
}
if (ListTools.isNotEmpty(removeEntries)) {
modify = true;
for (String str : removeEntries) {
ThisApplication.context().applications().remove(str);
}
}
if (modify) {
applications.updateTimestamp(now);
Config.resource_node_applicationsTimestamp(now);
Config.resource_node_applications(XGsonBuilder.instance().toJsonTree(applications));
for (String key : removeEntries) {
applications.remove(key);
}
}
}
......@@ -8,8 +8,18 @@ public class CenterQueueRegistApplicationsBody extends ArrayList<Application> im
private static final long serialVersionUID = -6222059999168636606L;
private String node;
public String type() {
return TYPE_REGISTAPPLICATIONS;
}
public String getNode() {
return node;
}
public void setNode(String node) {
this.node = node;
}
}
......@@ -57,15 +57,15 @@ public class Context extends AbstractContext {
private static Logger logger = LoggerFactory.getLogger(Context.class);
@Override
public Applications applications() throws Exception {
return applications;
}
// @Override
// public Applications applications() throws Exception {
// return applications;
// }
@Override
public ThreadFactory threadFactory() {
return threadFactory;
}
// @Override
// public ThreadFactory threadFactory() {
// return threadFactory;
// }
/* 应用的磁盘路径 */
private volatile String path;
......@@ -169,7 +169,7 @@ public class Context extends AbstractContext {
context.servletContextName = servletContext.getServletContextName();
context.clazz = Class.forName(servletContextEvent.getServletContext().getInitParameter(INITPARAMETER_PORJECT));
context.initDatas();
context.threadFactory = new ThreadFactory(context);
// context.threadFactory = new ThreadFactory(context);
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
context.checkDefaultRole(emc);
}
......
......@@ -92,8 +92,8 @@ public class ThisApplication {
context().scheduleLocal(WeLinkSyncOrganizationTrigger.class, Config.weLink().getForceSyncCron());
}
context().scheduleLocal(RefreshApplications.class, CenterQueue.REFRESHAPPLICATIONSINTERVAL,
CenterQueue.REFRESHAPPLICATIONSINTERVAL);
// context().scheduleLocal(RefreshApplications.class, CenterQueue.REFRESHAPPLICATIONSINTERVAL,
// CenterQueue.REFRESHAPPLICATIONSINTERVAL);
// 运行间隔由300秒缩减到120秒
context().scheduleLocal(FireSchedule.class, 180, 120);
context().scheduleLocal(CleanupCode.class, 10, 60 * 30);
......
......@@ -20,6 +20,8 @@ class ActionRegistApplications extends BaseAction {
CenterQueueRegistApplicationsBody body = gson.fromJson(wi.getValue(), CenterQueueRegistApplicationsBody.class);
body.setNode(wi.getNode());
ThisApplication.centerQueue.send(body);
Wo wo = new Wo();
......@@ -34,6 +36,16 @@ class ActionRegistApplications extends BaseAction {
public static class Wi extends WrapString {
private String node;
public String getNode() {
return node;
}
public void setNode(String node) {
this.node = node;
}
}
}
\ No newline at end of file
......@@ -82,35 +82,33 @@ class ActionInstallOrUpdate extends BaseAction {
if (BooleanUtils.isTrue(Config.collect().getEnable())) {
String token = business.loginCollect();
if (StringUtils.isNotEmpty(token)) {
byte[] bytes = ConnectionAction.getFile(
byte[] bytes = ConnectionAction.getBinary(
Config.collect().url(Collect.ADDRESS_COLLECT_APPLICATION_DOWN + "/" + id),
ListTools.toList(new NameValuePair(Collect.COLLECT_TOKEN, token)));
if (bytes != null) {
if ((null != bytes) && (bytes.length > 0)) {
InstallData installData = this.install(id, bytes);
if (installData != null) {
wo.setValue(true);
emc.beginTransaction(InstallLog.class);
InstallLog installLog = emc.find(id, InstallLog.class);
boolean exist = true;
if (installLog == null) {
installLog = new InstallLog();
installLog.setId(app.getId());
exist = false;
}
installLog.setName(app.getName());
installLog.setVersion(app.getVersion());
installLog.setCategory(app.getCategory());
installLog.setStatus(CommonStatus.VALID.getValue());
installLog.setData(gson.toJson(installData));
installLog.setInstallPerson(effectivePerson.getDistinguishedName());
installLog.setInstallTime(new Date());
installLog.setUnInstallPerson(null);
installLog.setUnInstallTime(null);
if (!exist) {
emc.persist(installLog);
}
emc.commit();
wo.setValue(true);
emc.beginTransaction(InstallLog.class);
InstallLog installLog = emc.find(id, InstallLog.class);
boolean exist = true;
if (installLog == null) {
installLog = new InstallLog();
installLog.setId(app.getId());
exist = false;
}
installLog.setName(app.getName());
installLog.setVersion(app.getVersion());
installLog.setCategory(app.getCategory());
installLog.setStatus(CommonStatus.VALID.getValue());
installLog.setData(gson.toJson(installData));
installLog.setInstallPerson(effectivePerson.getDistinguishedName());
installLog.setInstallTime(new Date());
installLog.setUnInstallPerson(null);
installLog.setUnInstallTime(null);
if (!exist) {
emc.persist(installLog);
}
emc.commit();
}
}
}
......@@ -130,7 +128,7 @@ class ActionInstallOrUpdate extends BaseAction {
File dist = new File(tempFile.getAbsolutePath(), "data");
FileTools.forceMkdir(dist);
JarTools.unjar(zipFile, new ArrayList<>(), dist, true);
//过滤必要的文件
// 过滤必要的文件
File[] files = dist.listFiles(new FileFilter() {
public boolean accept(File pathname) {
return true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册