提交 79f31784 编写于 作者: J Jason Song 提交者: GitHub

Merge pull request #547 from lepdou/hermes

send msg to mq when namespace published
......@@ -215,5 +215,8 @@ public class PortalConfig extends RefreshableConfig {
return getValue("clogging.server.port");
}
public String hermesServerAddress() {
return getValue("hermes.server.address");
}
}
......@@ -2,6 +2,7 @@ package com.ctrip.framework.apollo.portal.listener;
import com.ctrip.framework.apollo.common.constants.ReleaseOperation;
import com.ctrip.framework.apollo.core.enums.Env;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.portal.component.config.PortalConfig;
import com.ctrip.framework.apollo.portal.component.emailbuilder.GrayPublishEmailBuilder;
import com.ctrip.framework.apollo.portal.component.emailbuilder.MergeEmailBuilder;
......@@ -11,12 +12,18 @@ import com.ctrip.framework.apollo.portal.entity.bo.Email;
import com.ctrip.framework.apollo.portal.entity.bo.ReleaseHistoryBO;
import com.ctrip.framework.apollo.portal.service.ReleaseHistoryService;
import com.ctrip.framework.apollo.portal.spi.EmailService;
import com.ctrip.framework.apollo.portal.spi.MQService;
import com.ctrip.framework.apollo.tracer.Tracer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
@Component
public class ConfigPublishListener {
......@@ -34,75 +41,107 @@ public class ConfigPublishListener {
private MergeEmailBuilder mergeEmailBuilder;
@Autowired
private PortalConfig portalConfig;
@Autowired
private MQService mqService;
private ExecutorService executorService;
@PostConstruct
public void init() {
executorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("ConfigPublishNotify", false));
}
@EventListener
public void onConfigPublish(ConfigPublishEvent event) {
Env env = event.getConfigPublishInfo().getEnv();
if (!portalConfig.emailSupportedEnvs().contains(env)) {
return;
}
executorService.submit(new ConfigPublishNotifyTask(event.getConfigPublishInfo()));
}
ReleaseHistoryBO releaseHistory = getReleaseHistory(event);
if (releaseHistory == null) {
Tracer.logError("Will not send email, because load release history error", null);
return;
}
private class ConfigPublishNotifyTask implements Runnable {
int realOperation = releaseHistory.getOperation();
private ConfigPublishEvent.ConfigPublishInfo publishInfo;
Email email = null;
try {
email = buildEmail(env, releaseHistory, realOperation);
} catch (Throwable e) {
Tracer.logError("build email failed.", e);
ConfigPublishNotifyTask(ConfigPublishEvent.ConfigPublishInfo publishInfo) {
this.publishInfo = publishInfo;
}
if (email != null) {
emailService.send(email);
@Override
public void run() {
ReleaseHistoryBO releaseHistory = getReleaseHistory();
if (releaseHistory == null) {
Tracer.logError("Load release history failed", null);
return;
}
sendPublishEmail(releaseHistory);
sendPublishMsg(releaseHistory);
}
}
private ReleaseHistoryBO getReleaseHistory(ConfigPublishEvent event) {
ConfigPublishEvent.ConfigPublishInfo info = event.getConfigPublishInfo();
Env env = info.getEnv();
private ReleaseHistoryBO getReleaseHistory() {
Env env = publishInfo.getEnv();
int operation = info.isMergeEvent() ? ReleaseOperation.GRAY_RELEASE_MERGE_TO_MASTER :
info.isRollbackEvent() ? ReleaseOperation.ROLLBACK :
info.isNormalPublishEvent() ? ReleaseOperation.NORMAL_RELEASE :
info.isGrayPublishEvent() ? ReleaseOperation.GRAY_RELEASE : -1;
int operation = publishInfo.isMergeEvent() ? ReleaseOperation.GRAY_RELEASE_MERGE_TO_MASTER :
publishInfo.isRollbackEvent() ? ReleaseOperation.ROLLBACK :
publishInfo.isNormalPublishEvent() ? ReleaseOperation.NORMAL_RELEASE :
publishInfo.isGrayPublishEvent() ? ReleaseOperation.GRAY_RELEASE : -1;
if (operation == -1) {
return null;
}
if (operation == -1) {
return null;
}
if (info.isRollbackEvent()) {
return releaseHistoryService
.findLatestByPreviousReleaseIdAndOperation(env, info.getPreviousReleaseId(), operation);
} else {
return releaseHistoryService.findLatestByReleaseIdAndOperation(env, info.getReleaseId(), operation);
}
if (publishInfo.isRollbackEvent()) {
return releaseHistoryService
.findLatestByPreviousReleaseIdAndOperation(env, publishInfo.getPreviousReleaseId(), operation);
} else {
return releaseHistoryService.findLatestByReleaseIdAndOperation(env, publishInfo.getReleaseId(), operation);
}
}
}
private void sendPublishEmail(ReleaseHistoryBO releaseHistory) {
Env env = publishInfo.getEnv();
private Email buildEmail(Env env, ReleaseHistoryBO releaseHistory, int operation) {
switch (operation) {
case ReleaseOperation.GRAY_RELEASE: {
return grayPublishEmailBuilder.build(env, releaseHistory);
if (!portalConfig.emailSupportedEnvs().contains(env)) {
return;
}
case ReleaseOperation.NORMAL_RELEASE: {
return normalPublishEmailBuilder.build(env, releaseHistory);
int realOperation = releaseHistory.getOperation();
Email email = null;
try {
email = buildEmail(env, releaseHistory, realOperation);
} catch (Throwable e) {
Tracer.logError("build email failed.", e);
}
case ReleaseOperation.ROLLBACK: {
return rollbackEmailBuilder.build(env, releaseHistory);
if (email != null) {
emailService.send(email);
}
case ReleaseOperation.GRAY_RELEASE_MERGE_TO_MASTER: {
return mergeEmailBuilder.build(env, releaseHistory);
}
private void sendPublishMsg(ReleaseHistoryBO releaseHistory) {
mqService.sendPublishMsg(publishInfo.getEnv(), releaseHistory);
}
private Email buildEmail(Env env, ReleaseHistoryBO releaseHistory, int operation) {
switch (operation) {
case ReleaseOperation.GRAY_RELEASE: {
return grayPublishEmailBuilder.build(env, releaseHistory);
}
case ReleaseOperation.NORMAL_RELEASE: {
return normalPublishEmailBuilder.build(env, releaseHistory);
}
case ReleaseOperation.ROLLBACK: {
return rollbackEmailBuilder.build(env, releaseHistory);
}
case ReleaseOperation.GRAY_RELEASE_MERGE_TO_MASTER: {
return mergeEmailBuilder.build(env, releaseHistory);
}
default:
return null;
}
default:
return null;
}
}
}
package com.ctrip.framework.apollo.portal.spi;
import com.ctrip.framework.apollo.core.enums.Env;
import com.ctrip.framework.apollo.portal.entity.bo.ReleaseHistoryBO;
public interface MQService {
void sendPublishMsg(Env env, ReleaseHistoryBO releaseHistory);
}
package com.ctrip.framework.apollo.portal.spi.configuration;
import com.ctrip.framework.apollo.portal.spi.ctrip.CtripMQService;
import com.ctrip.framework.apollo.portal.spi.defaultimpl.DefaultMQService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Configuration
public class MQConfiguration {
@Configuration
@Profile("ctrip")
public static class CtripMQConfiguration {
@Bean
public CtripMQService mqService() {
return new CtripMQService();
}
}
/**
* spring.profiles.active != ctrip
*/
@Configuration
@Profile({"!ctrip"})
public static class DefaultMQConfiguration {
@Bean
public DefaultMQService mqService() {
return new DefaultMQService();
}
}
}
package com.ctrip.framework.apollo.portal.spi.ctrip;
import com.google.gson.Gson;
import com.ctrip.framework.apollo.common.dto.ReleaseDTO;
import com.ctrip.framework.apollo.common.entity.App;
import com.ctrip.framework.apollo.core.enums.Env;
import com.ctrip.framework.apollo.portal.component.config.PortalConfig;
import com.ctrip.framework.apollo.portal.entity.bo.ReleaseHistoryBO;
import com.ctrip.framework.apollo.portal.service.AppService;
import com.ctrip.framework.apollo.portal.service.ReleaseService;
import com.ctrip.framework.apollo.portal.spi.MQService;
import com.ctrip.framework.apollo.tracer.Tracer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.converter.FormHttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.client.RestTemplate;
import java.util.Arrays;
import javax.annotation.PostConstruct;
public class CtripMQService implements MQService {
private static final org.apache.commons.lang.time.FastDateFormat
TIMESTAMP_FORMAT = org.apache.commons.lang.time.FastDateFormat.getInstance("yyyy-MM-dd hh:mm:ss");
private static final String CONFIG_PUBLISH_NOTIFY_TO_NOC_TOPIC = "ops.noc.record.created";
private Gson gson = new Gson();
@Autowired
private AppService appService;
@Autowired
private ReleaseService releaseService;
@Autowired
private PortalConfig portalConfig;
private RestTemplate restTemplate;
@PostConstruct
public void init() {
restTemplate = new RestTemplate();
SimpleClientHttpRequestFactory rf = (SimpleClientHttpRequestFactory) restTemplate.getRequestFactory();
rf.setReadTimeout(portalConfig.readTimeout());
rf.setConnectTimeout(portalConfig.connectTimeout());
MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
converter.setSupportedMediaTypes(
Arrays.asList(MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM));
restTemplate.setMessageConverters(Arrays.asList(converter, new FormHttpMessageConverter()));
}
@Override
public void sendPublishMsg(Env env, ReleaseHistoryBO releaseHistory) {
if (releaseHistory == null) {
return;
}
PublishMsg msg = buildPublishMsg(env, releaseHistory);
sendMsg(portalConfig.hermesServerAddress(), CONFIG_PUBLISH_NOTIFY_TO_NOC_TOPIC, msg);
}
private PublishMsg buildPublishMsg(Env env, ReleaseHistoryBO releaseHistory) {
PublishMsg msg = new PublishMsg();
msg.setPriority("中");
msg.setTool_origin("Apollo");
String appId = releaseHistory.getAppId();
App app = appService.load(appId);
msg.setInfluence_bu(app.getOrgId());
msg.setAppid(appId);
ReleaseDTO release = releaseService.findReleaseById(env, releaseHistory.getReleaseId());
msg.setAssginee(release.getDataChangeCreatedBy());
msg.setDesc(
gson.toJson(releaseService.compare(env, releaseHistory.getPreviousReleaseId(), releaseHistory.getReleaseId())));
msg.setOperation_time(TIMESTAMP_FORMAT.format(release.getDataChangeCreatedTime()));
return msg;
}
private void sendMsg(String serverAddress, String topic, Object msg) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.parseMediaType(MediaType.APPLICATION_OCTET_STREAM + ";charset=UTF-8"));
HttpEntity<Object> request = new HttpEntity<>(msg, headers);
try {
//send msg by hermes RestAPI
restTemplate.postForObject(serverAddress + "/topics/" + topic, request, Object.class);
} catch (Exception e) {
Tracer.logError("Send publish msg to hermes failed", e);
}
}
private class PublishMsg {
private String assginee;
private String desc;
private String operation_time;
private String tool_origin;
private String priority;
private String influence_bu;
private String appid;
public String getAssginee() {
return assginee;
}
public void setAssginee(String assginee) {
this.assginee = assginee;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public String getOperation_time() {
return operation_time;
}
public void setOperation_time(String operation_time) {
this.operation_time = operation_time;
}
public String getTool_origin() {
return tool_origin;
}
public void setTool_origin(String tool_origin) {
this.tool_origin = tool_origin;
}
public String getPriority() {
return priority;
}
public void setPriority(String priority) {
this.priority = priority;
}
public String getInfluence_bu() {
return influence_bu;
}
public void setInfluence_bu(String influence_bu) {
this.influence_bu = influence_bu;
}
public String getAppid() {
return appid;
}
public void setAppid(String appid) {
this.appid = appid;
}
}
}
package com.ctrip.framework.apollo.portal.spi.defaultimpl;
import com.ctrip.framework.apollo.core.enums.Env;
import com.ctrip.framework.apollo.portal.entity.bo.ReleaseHistoryBO;
import com.ctrip.framework.apollo.portal.spi.MQService;
public class DefaultMQService implements MQService{
@Override
public void sendPublishMsg(Env env, ReleaseHistoryBO releaseHistory) {
//do nothing
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册