From 3eea44a714e6a488167b775c4fc612df44af6340 Mon Sep 17 00:00:00 2001 From: lindzh Date: Fri, 26 May 2017 16:12:16 +0800 Subject: [PATCH] [ROCKETMQ-194] log appender support closes apache/incubator-rocketmq#101 --- logappender/pom.xml | 86 +++++++ .../logappender/common/ProducerInstance.java | 93 +++++++ .../log4j/RocketmqLog4jAppender.java | 194 +++++++++++++++ .../log4j2/RocketmqLog4j2Appender.java | 233 ++++++++++++++++++ .../logback/RocketmqLogbackAppender.java | 183 ++++++++++++++ .../logappender/AbstractTestCase.java | 154 ++++++++++++ .../logappender/Log4jPropertiesTest.java | 32 +++ .../rocketmq/logappender/Log4jTest.java | 43 ++++ .../rocketmq/logappender/Log4jXmlTest.java | 32 +++ .../rocketmq/logappender/LogbackTest.java | 54 ++++ .../rocketmq/logappender/log4j2Test.java | 44 ++++ .../test/resources/log4j-example.properties | 38 +++ .../src/test/resources/log4j-example.xml | 62 +++++ .../src/test/resources/log4j2-example.xml | 41 +++ .../src/test/resources/logback-example.xml | 89 +++++++ pom.xml | 11 + style/rmq_checkstyle.xml | 6 + 17 files changed, 1395 insertions(+) create mode 100644 logappender/pom.xml create mode 100644 logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java create mode 100644 logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java create mode 100644 logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java create mode 100644 logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java create mode 100644 logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java create mode 100644 logappender/src/test/java/org/apache/rocketmq/logappender/Log4jPropertiesTest.java create mode 100644 logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java create mode 100644 logappender/src/test/java/org/apache/rocketmq/logappender/Log4jXmlTest.java create mode 100644 logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java create mode 100644 logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java create mode 100644 logappender/src/test/resources/log4j-example.properties create mode 100644 logappender/src/test/resources/log4j-example.xml create mode 100644 logappender/src/test/resources/log4j2-example.xml create mode 100644 logappender/src/test/resources/logback-example.xml diff --git a/logappender/pom.xml b/logappender/pom.xml new file mode 100644 index 00000000..5974c753 --- /dev/null +++ b/logappender/pom.xml @@ -0,0 +1,86 @@ + + + + + org.apache.rocketmq + rocketmq-all + 4.1.0-incubating-SNAPSHOT + + 4.0.0 + rocketmq-logappender + jar + rocketmq-logappender ${project.version} + + + + org.slf4j + slf4j-api + true + + + log4j + log4j + true + + + org.apache.logging.log4j + log4j-core + true + + + ch.qos.logback + logback-core + true + + + ch.qos.logback + logback-classic + true + + + ${project.groupId} + rocketmq-client + + + ${project.groupId} + rocketmq-namesrv + test + + + ch.qos.logback + logback-classic + + + + + ${project.groupId} + rocketmq-broker + test + + + ch.qos.logback + logback-classic + + + + + + + + diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java new file mode 100644 index 00000000..669e30c0 --- /dev/null +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.logappender.common; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MQProducer; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Common Producer component + */ +public class ProducerInstance { + + public static final String APPENDER_TYPE = "APPENDER_TYPE"; + + public static final String LOG4J_APPENDER = "LOG4J_APPENDER"; + + public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER"; + + public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER"; + + public static final String DEFAULT_GROUP = "rocketmq_appender"; + + private static ConcurrentHashMap producerMap = new ConcurrentHashMap(); + + private static String genKey(String nameServerAddress, String group) { + return nameServerAddress + "_" + group; + } + + + public static MQProducer getInstance(String nameServerAddress, String group) throws MQClientException { + if (group == null) { + group = DEFAULT_GROUP; + } + + String genKey = genKey(nameServerAddress, group); + MQProducer p = producerMap.get(genKey); + if (p != null) { + return p; + } + + DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group); + defaultMQProducer.setNamesrvAddr(nameServerAddress); + MQProducer beforeProducer = null; + //cas put producer + beforeProducer = producerMap.putIfAbsent(genKey, defaultMQProducer); + if (beforeProducer != null) { + return beforeProducer; + } + defaultMQProducer.start(); + return defaultMQProducer; + } + + + public static void removeAndClose(String nameServerAddress, String group) { + if (group == null) { + group = DEFAULT_GROUP; + } + String genKey = genKey(nameServerAddress, group); + MQProducer producer = producerMap.remove(genKey); + + if (producer != null) { + producer.shutdown(); + } + } + + public static void closeAll() { + Set> entries = producerMap.entrySet(); + for (Map.Entry entry : entries) { + producerMap.remove(entry.getKey()); + entry.getValue().shutdown(); + } + } + +} diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java new file mode 100644 index 00000000..b2983b6c --- /dev/null +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.logappender.log4j; + +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.logappender.common.ProducerInstance; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.helpers.LogLog; +import org.apache.log4j.spi.ErrorCode; +import org.apache.log4j.spi.LoggingEvent; +import org.apache.rocketmq.client.producer.MQProducer; + +/** + * Log4j Appender Component + */ +public class RocketmqLog4jAppender extends AppenderSkeleton { + + /** + * Appended message tag define + */ + private String tag; + + /** + * Whitch topic to send log messages + */ + private String topic; + + private boolean locationInfo; + + /** + * Log producer send instance + */ + private MQProducer producer; + + /** + * RocketMQ nameserver address + */ + private String nameServerAddress; + + /** + * Log producer group + */ + private String producerGroup; + + public RocketmqLog4jAppender() { + } + + + public void activateOptions() { + LogLog.debug("Getting initial context."); + if (!checkEntryConditions()) { + return; + } + try { + producer = ProducerInstance.getInstance(nameServerAddress, producerGroup); + } catch (Exception e) { + LogLog.error("activateOptions nameserver:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + } + } + + + /** + * Info,error,warn,callback method implementation + * + * @param event + */ + public void append(LoggingEvent event) { + if (null == producer) { + return; + } + if (locationInfo) { + event.getLocationInformation(); + } + byte[] data = this.layout.format(event).getBytes(); + try { + Message msg = new Message(topic, tag, data); + msg.getProperties().put(ProducerInstance.APPENDER_TYPE, ProducerInstance.LOG4J_APPENDER); + + //Send message and do not wait for the ack from the message broker. + producer.sendOneway(msg); + } catch (Exception e) { + String msg = new String(data); + errorHandler.error("Could not send message in RocketmqLog4jAppender [" + name + "].Message is :" + msg, e, + ErrorCode.GENERIC_FAILURE); + } + } + + protected boolean checkEntryConditions() { + String fail = null; + + if (this.topic == null) { + fail = "No topic"; + } else if (this.tag == null) { + fail = "No tag"; + } + + if (fail != null) { + errorHandler.error(fail + " for RocketmqLog4jAppender named [" + name + "]."); + return false; + } else { + return true; + } + } + + /** + * When system exit,this method will be called to close resources + */ + public synchronized void close() { + // The synchronized modifier avoids concurrent append and close operations + + if (this.closed) + return; + + LogLog.debug("Closing RocketmqLog4jAppender [" + name + "]."); + this.closed = true; + + try { + ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup); + } catch (Exception e) { + LogLog.error("Closing RocketmqLog4jAppender [" + name + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + } + // Help garbage collection + producer = null; + } + + public boolean requiresLayout() { + return true; + } + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + /** + * Returns value of the LocationInfo property which + * determines whether location (stack) info is sent to the remote + * subscriber. + */ + public boolean isLocationInfo() { + return locationInfo; + } + + /** + * If true, the information sent to the remote subscriber will + * include caller's location information. By default no location + * information is sent to the subscriber. + */ + public void setLocationInfo(boolean locationInfo) { + this.locationInfo = locationInfo; + } + + /** + * Returns the message producer,Only valid after + * activateOptions() method has been invoked. + */ + protected MQProducer getProducer() { + return producer; + } + + public void setNameServerAddress(String nameServerAddress) { + this.nameServerAddress = nameServerAddress; + } + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } +} diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java new file mode 100644 index 00000000..fb8341f1 --- /dev/null +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.logappender.log4j2; + +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.ErrorHandler; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.config.Node; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.logappender.common.ProducerInstance; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; +import org.apache.logging.log4j.core.layout.SerializedLayout; +import org.apache.rocketmq.client.producer.MQProducer; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +/** + * Log4j2 Appender Component + */ +@Plugin(name = "RocketMQ", + category = Node.CATEGORY, + elementType = Appender.ELEMENT_TYPE, + printObject = true) +public class RocketmqLog4j2Appender extends AbstractAppender { + + /** + * RocketMQ nameserver address + */ + private String nameServerAddress; + + /** + * Log producer group + */ + private String producerGroup; + + /** + * Log producer send instance + */ + private MQProducer producer; + + /** + * Appended message tag define + */ + private String tag; + + /** + * Whitch topic to send log messages + */ + private String topic; + + + protected RocketmqLog4j2Appender(String name, Filter filter, Layout layout, + boolean ignoreExceptions, String nameServerAddress, String producerGroup, + String topic, String tag) { + super(name, filter, layout, ignoreExceptions); + this.producer = producer; + this.topic = topic; + this.tag = tag; + this.nameServerAddress = nameServerAddress; + this.producerGroup = producerGroup; + try { + this.producer = ProducerInstance.getInstance(this.nameServerAddress, this.producerGroup); + } catch (Exception e) { + ErrorHandler handler = this.getHandler(); + if (handler != null) { + handler.error("Starting RocketmqLog4j2Appender [" + this.getName() + + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + } + } + } + + /** + * Info,error,warn,callback method implementation + * + * @param event + */ + public void append(LogEvent event) { + if (null == producer) { + return; + } + byte[] data = this.getLayout().toByteArray(event); + try { + Message msg = new Message(topic, tag, data); + msg.getProperties().put(ProducerInstance.APPENDER_TYPE, ProducerInstance.LOG4J2_APPENDER); + + //Send message and do not wait for the ack from the message broker. + producer.sendOneway(msg); + } catch (Exception e) { + ErrorHandler handler = this.getHandler(); + if (handler != null) { + String msg = new String(data); + handler.error("Could not send message in RocketmqLog4j2Appender [" + this.getName() + "].Message is : " + msg, e); + } + + } + } + + /** + * When system exit,this method will be called to close resources + * + * @param timeout + * @param timeUnit + * @return + */ + public boolean stop(long timeout, TimeUnit timeUnit) { + this.setStopping(); + try { + ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup); + } catch (Exception e) { + ErrorHandler handler = this.getHandler(); + if (handler != null) { + handler.error("Closeing RocketmqLog4j2Appender [" + this.getName() + + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + } + } + + boolean stopped = super.stop(timeout, timeUnit, false); + this.setStopped(); + return stopped; + } + + /** + * Log4j2 builder creator + */ + @PluginBuilderFactory + public static RocketmqLog4j2Appender.Builder newBuilder() { + return new RocketmqLog4j2Appender.Builder(); + } + + /** + * Log4j2 xml builder define + */ + public static class Builder implements org.apache.logging.log4j.core.util.Builder { + + @PluginBuilderAttribute + @Required(message = "A name for the RocketmqLog4j2Appender must be specified") + private String name; + + @PluginElement("Layout") + private Layout layout; + + @PluginElement("Filter") + private Filter filter; + + @PluginBuilderAttribute + private boolean ignoreExceptions; + + @PluginBuilderAttribute + private String tag; + + @PluginBuilderAttribute + private String nameServerAddress; + + @PluginBuilderAttribute + private String producerGroup; + + @PluginBuilderAttribute + @Required(message = "A topic name must be specified") + private String topic; + + private Builder() { + this.layout = SerializedLayout.createLayout(); + this.ignoreExceptions = true; + } + + public RocketmqLog4j2Appender.Builder setName(String name) { + this.name = name; + return this; + } + + public RocketmqLog4j2Appender.Builder setLayout(Layout layout) { + this.layout = layout; + return this; + } + + public RocketmqLog4j2Appender.Builder setFilter(Filter filter) { + this.filter = filter; + return this; + } + + public RocketmqLog4j2Appender.Builder setIgnoreExceptions(boolean ignoreExceptions) { + this.ignoreExceptions = ignoreExceptions; + return this; + } + + public RocketmqLog4j2Appender.Builder setTag(final String tag) { + this.tag = tag; + return this; + } + + public RocketmqLog4j2Appender.Builder setTopic(final String topic) { + this.topic = topic; + return this; + } + + public RocketmqLog4j2Appender.Builder setNameServerAddress(String nameServerAddress) { + this.nameServerAddress = nameServerAddress; + return this; + } + + public RocketmqLog4j2Appender.Builder setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + return this; + } + + public RocketmqLog4j2Appender build() { + return new RocketmqLog4j2Appender(name, filter, layout, ignoreExceptions, + nameServerAddress, producerGroup, topic, tag); + } + } +} diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java new file mode 100644 index 00000000..cb455228 --- /dev/null +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.logappender.logback; + +import ch.qos.logback.classic.net.LoggingEventPreSerializationTransformer; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.AppenderBase; +import ch.qos.logback.core.Layout; +import ch.qos.logback.core.spi.PreSerializationTransformer; +import ch.qos.logback.core.status.ErrorStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.logappender.common.ProducerInstance; +import org.apache.rocketmq.client.producer.MQProducer; + +/** + * Logback Appender Component + */ +public class RocketmqLogbackAppender extends AppenderBase { + + /** + * Message tag define + */ + private String tag; + + /** + * Whitch topic to send log messages + */ + private String topic; + + /** + * RocketMQ nameserver address + */ + private String nameServerAddress; + + /** + * Log producer group + */ + private String producerGroup; + + /** + * Log producer send instance + */ + private MQProducer producer; + + private Layout layout; + + private PreSerializationTransformer pst = new LoggingEventPreSerializationTransformer(); + + /** + * Info,error,warn,callback method implementation + * + * @param event + */ + @Override + protected void append(ILoggingEvent event) { + if (!isStarted()) { + return; + } + String logStr = this.layout.doLayout(event); + try { + Message msg = new Message(topic, tag, logStr.getBytes()); + msg.getProperties().put(ProducerInstance.APPENDER_TYPE, ProducerInstance.LOGBACK_APPENDER); + + //Send message and do not wait for the ack from the message broker. + producer.sendOneway(msg); + } catch (Exception e) { + addError("Could not send message in RocketmqLogbackAppender [" + name + "]. Message is : " + logStr, e); + } + } + + /** + * Options are activated and become effective only after calling this method. + */ + public void start() { + int errors = 0; + + if (this.layout == null) { + addStatus(new ErrorStatus("No layout set for the RocketmqLogbackAppender named \"" + name + "\".", this)); + errors++; + } + + if (errors > 0 || !checkEntryConditions()) { + return; + } + try { + producer = ProducerInstance.getInstance(nameServerAddress, producerGroup); + } catch (Exception e) { + addError("Starting RocketmqLogbackAppender [" + this.getName() + + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + } + if (producer != null) { + super.start(); + } + } + + /** + * When system exit,this method will be called to close resources + */ + public synchronized void stop() { + // The synchronized modifier avoids concurrent append and close operations + if (!this.started) { + return; + } + + this.started = false; + + try { + ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup); + } catch (Exception e) { + addError("Closeing RocketmqLogbackAppender [" + this.getName() + + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + } + + // Help garbage collection + producer = null; + } + + protected boolean checkEntryConditions() { + String fail = null; + + if (this.topic == null) { + fail = "No topic"; + } + + if (fail != null) { + addError(fail + " for RocketmqLogbackAppender named [" + name + "]."); + return false; + } else { + return true; + } + } + + + public Layout getLayout() { + return this.layout; + } + + /** + * Set the pattern layout to format the log. + */ + public void setLayout(Layout layout) { + this.layout = layout; + } + + public String getTag() { + return tag; + } + + + public void setTag(String tag) { + this.tag = tag; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public void setNameServerAddress(String nameServerAddress) { + this.nameServerAddress = nameServerAddress; + } + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } +} diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java new file mode 100644 index 00000000..d3e2f8ae --- /dev/null +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.logappender; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.namesrv.NamesrvConfig; +import org.apache.rocketmq.logappender.common.ProducerInstance; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Basic test rocketmq broker and name server init + */ +public class AbstractTestCase { + + private static String nameServer = "localhost:9876"; + + private static NamesrvController namesrvController; + + private static BrokerController brokerController; + + private static String topic = "TopicTest"; + + @BeforeClass + public static void startRocketmqService() throws Exception { + + startNamesrv(); + + startBroker(); + } + + /** + * Start rocketmq name server + * @throws Exception + */ + private static void startNamesrv() throws Exception { + + NamesrvConfig namesrvConfig = new NamesrvConfig(); + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + nettyServerConfig.setListenPort(9876); + + namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig); + boolean initResult = namesrvController.initialize(); + if (!initResult) { + namesrvController.shutdown(); + throw new Exception(); + } + namesrvController.start(); + } + + /** + * Start rocketmq broker service + * @throws Exception + */ + private static void startBroker() throws Exception { + + System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setNamesrvAddr(nameServer); + brokerConfig.setBrokerId(MixAll.MASTER_ID); + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + nettyServerConfig.setListenPort(10911); + NettyClientConfig nettyClientConfig = new NettyClientConfig(); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + + brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); + boolean initResult = brokerController.initialize(); + if (!initResult) { + brokerController.shutdown(); + throw new Exception(); + } + brokerController.start(); + } + + @AfterClass + public static void stop() { + ProducerInstance.closeAll(); + if (brokerController != null) { + brokerController.shutdown(); + } + + if (namesrvController != null) { + namesrvController.shutdown(); + } + } + + protected int consumeMessages(int count,final String key,int timeout) throws MQClientException, InterruptedException { + + final AtomicInteger cc = new AtomicInteger(0); + final CountDownLatch countDownLatch = new CountDownLatch(count); + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello"); + consumer.setNamesrvAddr(nameServer); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + consumer.subscribe(topic, "*"); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + for (MessageExt msg : msgs) { + String body = new String(msg.getBody()); + if(key==null||body.contains(key)){ + countDownLatch.countDown(); + cc.incrementAndGet(); + continue; + } + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + consumer.start(); + countDownLatch.await(timeout, TimeUnit.SECONDS); + consumer.shutdown(); + return cc.get(); + } +} diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jPropertiesTest.java b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jPropertiesTest.java new file mode 100644 index 00000000..86752301 --- /dev/null +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jPropertiesTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.logappender; + +import org.apache.log4j.PropertyConfigurator; + +public class Log4jPropertiesTest extends Log4jTest { + + @Override + public void init() { + PropertyConfigurator.configure("src/test/resources/log4j-example.properties"); + } + + @Override + public String getType() { + return "properties"; + } +} diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java new file mode 100644 index 00000000..75f9bf2b --- /dev/null +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.logappender; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.exception.MQClientException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public abstract class Log4jTest extends AbstractTestCase{ + + @Before + public abstract void init(); + + public abstract String getType(); + + @Test + public void testLog4j() throws InterruptedException, MQClientException { + Logger logger = Logger.getLogger("testLogger"); + for (int i = 0; i < 50; i++) { + logger.info("log4j " + this.getType() + " simple test message " + i); + } + int received = consumeMessages(30, "log4j",30); + Assert.assertTrue(received>20); + } + +} diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jXmlTest.java b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jXmlTest.java new file mode 100644 index 00000000..6743f7c1 --- /dev/null +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jXmlTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.logappender; + +import org.apache.log4j.xml.DOMConfigurator; + +public class Log4jXmlTest extends Log4jTest { + + @Override + public void init() { + DOMConfigurator.configure("src/test/resources/log4j-example.xml"); + } + + @Override + public String getType() { + return "xml"; + } +} diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java b/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java new file mode 100644 index 00000000..15a21a39 --- /dev/null +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.logappender; + +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.joran.JoranConfigurator; +import ch.qos.logback.core.joran.spi.JoranException; +import ch.qos.logback.core.util.StatusPrinter; +import org.apache.rocketmq.client.exception.MQClientException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +public class LogbackTest extends AbstractTestCase{ + + @Before + public void init() throws JoranException { + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + JoranConfigurator configurator = new JoranConfigurator(); + configurator.setContext(lc); + lc.reset(); + configurator.doConfigure(new File("src/test/resources/logback-example.xml")); + StatusPrinter.printInCaseOfErrorsOrWarnings(lc); + } + + + @Test + public void testLogback() throws InterruptedException, MQClientException { + Logger logger = LoggerFactory.getLogger("testLogger"); + for (int i = 0; i < 50; i++) { + logger.info("logback test message " + i); + } + int received = consumeMessages(30, "logback",30); + Assert.assertTrue(received>20); + } +} diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java b/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java new file mode 100644 index 00000000..75ba523f --- /dev/null +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.logappender; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.config.Configurator; +import org.apache.rocketmq.client.exception.MQClientException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class log4j2Test extends AbstractTestCase{ + + @Before + public void init() { + Configurator.initialize("log4j2", "src/test/resources/log4j2-example.xml"); + } + + + @Test + public void testLog4j2() throws InterruptedException, MQClientException { + Logger logger = LogManager.getLogger("test"); + for (int i = 0; i < 50; i++) { + logger.info("log4j2 log message " + i); + } + int received = consumeMessages(30, "log4j2",30); + Assert.assertTrue(received>20); + } +} diff --git a/logappender/src/test/resources/log4j-example.properties b/logappender/src/test/resources/log4j-example.properties new file mode 100644 index 00000000..b4e8114a --- /dev/null +++ b/logappender/src/test/resources/log4j-example.properties @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO,stdout + +log4j.logger.testLogger=INFO,mq + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d %-4r [%t] (%F:%L) %-5p - %m%n + +log4j.appender.store=org.apache.log4j.DailyRollingFileAppender +log4j.appender.store.File=${user.home}/logs/rocketmqlogs/appender.log +log4j.appender.store.Append=true +log4j.appender.store.DatePattern ='_'yyyy-MM-dd'.log' +log4j.appender.store.layout=org.apache.log4j.PatternLayout +log4j.appender.store.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n + +log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender +log4j.appender.mq.Tag=log +log4j.appender.mq.Topic=TopicTest +log4j.appender.mq.ProducerGroup=log4jp +log4j.appender.mq.NameServerAddress=127.0.0.1:9876 +log4j.appender.mq.layout=org.apache.log4j.PatternLayout +log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n \ No newline at end of file diff --git a/logappender/src/test/resources/log4j-example.xml b/logappender/src/test/resources/log4j-example.xml new file mode 100644 index 00000000..e58bcb09 --- /dev/null +++ b/logappender/src/test/resources/log4j-example.xml @@ -0,0 +1,62 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/logappender/src/test/resources/log4j2-example.xml b/logappender/src/test/resources/log4j2-example.xml new file mode 100644 index 00000000..358d40ea --- /dev/null +++ b/logappender/src/test/resources/log4j2-example.xml @@ -0,0 +1,41 @@ + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/logappender/src/test/resources/logback-example.xml b/logappender/src/test/resources/logback-example.xml new file mode 100644 index 00000000..21b54349 --- /dev/null +++ b/logappender/src/test/resources/logback-example.xml @@ -0,0 +1,89 @@ + + + + + + + ${user.home}/logs/simple/system.log + true + + ${user.home}/logs/simple/system.%i.log + + 1 + 30 + + + 100MB + + + %date %p %t - %m%n + UTF-8 + + + + + System.out + + %date %p %t - %m%n + UTF-8 + + + + + ${user.home}/logs/simple/daily.log + + ${user.home}/logs/simple/daily.log.%d{yyyy-MM-dd_HH} + 30 + + + %date %p %t - %m%n + + + + + log1 + TopicTest + logback + 127.0.0.1:9876 + + %date %p %t - %m%n + + + + + 1024 + 80 + 2000 + true + + + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 25e4c841..c60c93c1 100644 --- a/pom.xml +++ b/pom.xml @@ -175,6 +175,7 @@ store namesrv remoting + logappender example filtersrv srvutil @@ -623,6 +624,16 @@ openmessaging-api 0.1.0-alpha + + log4j + log4j + 1.2.17 + + + org.apache.logging.log4j + log4j-core + 2.7 + diff --git a/style/rmq_checkstyle.xml b/style/rmq_checkstyle.xml index 2872eb72..6ec2ad08 100644 --- a/style/rmq_checkstyle.xml +++ b/style/rmq_checkstyle.xml @@ -30,6 +30,12 @@ + + + + + + -- GitLab