diff --git a/o2server/configSample/mq.json b/o2server/configSample/mq.json index 89694d2007ce84a4166c40d02ed72a4e2cf37ffe..6149e4359914f1e2ef70ce1181034b3a49e12226 100755 --- a/o2server/configSample/mq.json +++ b/o2server/configSample/mq.json @@ -23,8 +23,14 @@ "activeMQ":{ "url":"tcp://127.0.0.1:61616", "queueName":"queue-test", + "keyStore":"C:/Users/wwx/client.ks", + "trustStore":"C:/Users/wwx/client.ts", + "keyStorePassword":"password", "###url": "服务地址,端口默认61616.###", - "###queueName": "要创建的消息名称###" + "###queueName": "要创建的消息名称###", + "###keyStore": "密钥文件存储路径###", + "###trustStore": "证书文件存储路径###", + "###keyStorePassword": "密钥密码###" }, "###enable": "是否启用.###", "###mq": "消息服务类型.###" diff --git a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQActive.java b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQActive.java index 86e435044a03c578072fea5c8fd9d4166896a9ae..e609471ec3ddabb774f49ed0696cf2b53fbcff1d 100644 --- a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQActive.java +++ b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQActive.java @@ -22,6 +22,15 @@ public class MQActive extends ConfigObject { @FieldDescribe("消息队列名") private String queueName; + @FieldDescribe("密钥文件存储路径") + private String keyStore; + + @FieldDescribe("证书文件存储路径") + private String trustStore; + + @FieldDescribe("密钥密码") + private String keyStorePassword; + public static MQActive defaultInstance() { return new MQActive(); } @@ -50,6 +59,30 @@ public class MQActive extends ConfigObject { public void setQueueName(String queueName) { this.queueName = queueName; } + + public String getKeyStore() { + return keyStore; + } + + public void setKeyStore(String keyStore) { + this.keyStore = keyStore; + } + + public String getTrustStore() { + return trustStore; + } + + public void setTrustStore(String trustStore) { + this.trustStore = trustStore; + } + + public String getKeyStorePassword() { + return keyStorePassword; + } + + public void setKeyStorePassword(String keyStorePassword) { + this.keyStorePassword = keyStorePassword; + } diff --git a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/ActiveMQ.java b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/ActiveMQ.java index f4ea4493ef88900ce62020cc47b56d337341f3ff..df47de79d0b61e85912b47a65e140e1e8f649d34 100644 --- a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/ActiveMQ.java +++ b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/ActiveMQ.java @@ -10,6 +10,8 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSslConnectionFactory; + import com.google.gson.Gson; import com.x.base.core.project.config.Config; import com.x.base.core.project.config.MQActive; @@ -17,6 +19,13 @@ import com.x.base.core.project.logger.Logger; import com.x.base.core.project.logger.LoggerFactory; import com.x.message.core.entity.Message; +import java.io.FileInputStream; +import java.security.KeyStore; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; + public class ActiveMQ implements MQInterface { private static Logger logger = LoggerFactory.getLogger(ActiveMQ.class); @@ -28,16 +37,37 @@ public class ActiveMQ implements MQInterface { try { MQActive configMQ = Config.mq().getActiveMQ(); logger.info("MqActive initialize....."); - - String url=configMQ.getUrl(); String queueName=configMQ.getQueueName(); - - ConnectionFactory factory=new ActiveMQConnectionFactory(url); - this.connection= factory.createConnection(); - this.connection.start(); - this.session= this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination=session.createQueue(queueName); - this.producer = session.createProducer(destination); + String url=configMQ.getUrl(); + url = url.trim(); + + String protocol = url.substring(0, 3); + if(protocol.equalsIgnoreCase("tcp")) { + ConnectionFactory factory=new ActiveMQConnectionFactory(url); + this.connection= factory.createConnection(); + this.connection.start(); + + this.session= this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination=session.createQueue(queueName); + this.producer = session.createProducer(destination); + + }else { + String keyStore = configMQ.getKeyStore(); + String keyStorePassword = configMQ.getKeyStorePassword(); + String trustStore = configMQ.getTrustStore(); + + ActiveMQSslConnectionFactory sslConnectionFactory = new ActiveMQSslConnectionFactory(); + sslConnectionFactory.setBrokerURL(url); + sslConnectionFactory.setKeyAndTrustManagers(this.loadKeyManager(keyStore, keyStorePassword), this.loadTrustManager(trustStore), + new java.security.SecureRandom()); + this.connection = sslConnectionFactory.createConnection(); + this.connection.start(); + + this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(queueName); + this.producer = session.createProducer(destination); + } + } catch (Exception e) { e.printStackTrace(); logger.error(e); @@ -57,12 +87,12 @@ public class ActiveMQ implements MQInterface { public static void main(String[] args) { ActiveMQ MQClient = getInstance(); - //System.out.println(MQClient.getTopic()); Message msg = new Message(); msg.setBody("body"); msg.setConsumed(false); msg.setCreateTime(new Date()); msg.setPerson("person"); + MQClient.sendMessage(msg); } @Override @@ -83,13 +113,53 @@ public class ActiveMQ implements MQInterface { } public void destroy() { - System.out.println("MqActive destroy....."); - try { + try { + logger.info("MqActive destroy....."); this.connection.close(); } catch (JMSException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + e.printStackTrace(); logger.error(e); } - } + } + + + /** + * 加载证书文件 + * @param trustStore + * @return + * @throws java.security.NoSuchAlgorithmException + * @throws java.security.KeyStoreException + * @throws java.io.IOException + * @throws java.security.GeneralSecurityException + */ + public static TrustManager[] loadTrustManager(String trustStore) throws java.security.NoSuchAlgorithmException, java.security.KeyStoreException, + java.io.IOException, java.security.GeneralSecurityException { + KeyStore ks = KeyStore. getInstance("JKS"); + ks.load( new FileInputStream(trustStore), null); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory. getDefaultAlgorithm()); + tmf.init(ks); + return tmf.getTrustManagers(); + } + + /** + * 加载密钥文件 + * @param keyStore + * @param keyStorePassword + * @return + * @throws java.security.NoSuchAlgorithmException + * @throws java.security.KeyStoreException + * @throws java.security.GeneralSecurityException + * @throws java.security.cert.CertificateException + * @throws java.io.IOException + * @throws java.security.UnrecoverableKeyException + */ + public static KeyManager[] loadKeyManager(String keyStore, String keyStorePassword) throws java.security.NoSuchAlgorithmException, + java.security.KeyStoreException, java.security.GeneralSecurityException, java.security.cert.CertificateException, java.io.IOException, + java.security.UnrecoverableKeyException { + KeyStore ks = KeyStore. getInstance("JKS"); + ks.load( new FileInputStream(keyStore), keyStorePassword.toCharArray()); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory. getDefaultAlgorithm()); + kmf.init(ks, keyStorePassword.toCharArray()); + return kmf.getKeyManagers(); + } } \ No newline at end of file