提交 37fbb7be 编写于 作者: L lindzh 提交者: dongeforever

[ROCKETMQ-194] log appender support closes apache/incubator-rocketmq#101

上级 99ce40a7
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.1.0-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-logappender</artifactId>
<packaging>jar</packaging>
<name>rocketmq-logappender ${project.version}</name>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-namesrv</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-broker</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.logappender.common;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Common Producer component
*/
public class ProducerInstance {
public static final String APPENDER_TYPE = "APPENDER_TYPE";
public static final String LOG4J_APPENDER = "LOG4J_APPENDER";
public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER";
public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER";
public static final String DEFAULT_GROUP = "rocketmq_appender";
private static ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();
private static String genKey(String nameServerAddress, String group) {
return nameServerAddress + "_" + group;
}
public static MQProducer getInstance(String nameServerAddress, String group) throws MQClientException {
if (group == null) {
group = DEFAULT_GROUP;
}
String genKey = genKey(nameServerAddress, group);
MQProducer p = producerMap.get(genKey);
if (p != null) {
return p;
}
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
defaultMQProducer.setNamesrvAddr(nameServerAddress);
MQProducer beforeProducer = null;
//cas put producer
beforeProducer = producerMap.putIfAbsent(genKey, defaultMQProducer);
if (beforeProducer != null) {
return beforeProducer;
}
defaultMQProducer.start();
return defaultMQProducer;
}
public static void removeAndClose(String nameServerAddress, String group) {
if (group == null) {
group = DEFAULT_GROUP;
}
String genKey = genKey(nameServerAddress, group);
MQProducer producer = producerMap.remove(genKey);
if (producer != null) {
producer.shutdown();
}
}
public static void closeAll() {
Set<Map.Entry<String, MQProducer>> entries = producerMap.entrySet();
for (Map.Entry<String, MQProducer> entry : entries) {
producerMap.remove(entry.getKey());
entry.getValue().shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.logappender.log4j;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logappender.common.ProducerInstance;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.ErrorCode;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.rocketmq.client.producer.MQProducer;
/**
* Log4j Appender Component
*/
public class RocketmqLog4jAppender extends AppenderSkeleton {
/**
* Appended message tag define
*/
private String tag;
/**
* Whitch topic to send log messages
*/
private String topic;
private boolean locationInfo;
/**
* Log producer send instance
*/
private MQProducer producer;
/**
* RocketMQ nameserver address
*/
private String nameServerAddress;
/**
* Log producer group
*/
private String producerGroup;
public RocketmqLog4jAppender() {
}
public void activateOptions() {
LogLog.debug("Getting initial context.");
if (!checkEntryConditions()) {
return;
}
try {
producer = ProducerInstance.getInstance(nameServerAddress, producerGroup);
} catch (Exception e) {
LogLog.error("activateOptions nameserver:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
}
}
/**
* Info,error,warn,callback method implementation
*
* @param event
*/
public void append(LoggingEvent event) {
if (null == producer) {
return;
}
if (locationInfo) {
event.getLocationInformation();
}
byte[] data = this.layout.format(event).getBytes();
try {
Message msg = new Message(topic, tag, data);
msg.getProperties().put(ProducerInstance.APPENDER_TYPE, ProducerInstance.LOG4J_APPENDER);
//Send message and do not wait for the ack from the message broker.
producer.sendOneway(msg);
} catch (Exception e) {
String msg = new String(data);
errorHandler.error("Could not send message in RocketmqLog4jAppender [" + name + "].Message is :" + msg, e,
ErrorCode.GENERIC_FAILURE);
}
}
protected boolean checkEntryConditions() {
String fail = null;
if (this.topic == null) {
fail = "No topic";
} else if (this.tag == null) {
fail = "No tag";
}
if (fail != null) {
errorHandler.error(fail + " for RocketmqLog4jAppender named [" + name + "].");
return false;
} else {
return true;
}
}
/**
* When system exit,this method will be called to close resources
*/
public synchronized void close() {
// The synchronized modifier avoids concurrent append and close operations
if (this.closed)
return;
LogLog.debug("Closing RocketmqLog4jAppender [" + name + "].");
this.closed = true;
try {
ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup);
} catch (Exception e) {
LogLog.error("Closing RocketmqLog4jAppender [" + name + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
}
// Help garbage collection
producer = null;
}
public boolean requiresLayout() {
return true;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
/**
* Returns value of the <b>LocationInfo</b> property which
* determines whether location (stack) info is sent to the remote
* subscriber.
*/
public boolean isLocationInfo() {
return locationInfo;
}
/**
* If true, the information sent to the remote subscriber will
* include caller's location information. By default no location
* information is sent to the subscriber.
*/
public void setLocationInfo(boolean locationInfo) {
this.locationInfo = locationInfo;
}
/**
* Returns the message producer,Only valid after
* activateOptions() method has been invoked.
*/
protected MQProducer getProducer() {
return producer;
}
public void setNameServerAddress(String nameServerAddress) {
this.nameServerAddress = nameServerAddress;
}
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.logappender.log4j2;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.ErrorHandler;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.Node;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logappender.common.ProducerInstance;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
import org.apache.logging.log4j.core.layout.SerializedLayout;
import org.apache.rocketmq.client.producer.MQProducer;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
/**
* Log4j2 Appender Component
*/
@Plugin(name = "RocketMQ",
category = Node.CATEGORY,
elementType = Appender.ELEMENT_TYPE,
printObject = true)
public class RocketmqLog4j2Appender extends AbstractAppender {
/**
* RocketMQ nameserver address
*/
private String nameServerAddress;
/**
* Log producer group
*/
private String producerGroup;
/**
* Log producer send instance
*/
private MQProducer producer;
/**
* Appended message tag define
*/
private String tag;
/**
* Whitch topic to send log messages
*/
private String topic;
protected RocketmqLog4j2Appender(String name, Filter filter, Layout<? extends Serializable> layout,
boolean ignoreExceptions, String nameServerAddress, String producerGroup,
String topic, String tag) {
super(name, filter, layout, ignoreExceptions);
this.producer = producer;
this.topic = topic;
this.tag = tag;
this.nameServerAddress = nameServerAddress;
this.producerGroup = producerGroup;
try {
this.producer = ProducerInstance.getInstance(this.nameServerAddress, this.producerGroup);
} catch (Exception e) {
ErrorHandler handler = this.getHandler();
if (handler != null) {
handler.error("Starting RocketmqLog4j2Appender [" + this.getName()
+ "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
}
}
}
/**
* Info,error,warn,callback method implementation
*
* @param event
*/
public void append(LogEvent event) {
if (null == producer) {
return;
}
byte[] data = this.getLayout().toByteArray(event);
try {
Message msg = new Message(topic, tag, data);
msg.getProperties().put(ProducerInstance.APPENDER_TYPE, ProducerInstance.LOG4J2_APPENDER);
//Send message and do not wait for the ack from the message broker.
producer.sendOneway(msg);
} catch (Exception e) {
ErrorHandler handler = this.getHandler();
if (handler != null) {
String msg = new String(data);
handler.error("Could not send message in RocketmqLog4j2Appender [" + this.getName() + "].Message is : " + msg, e);
}
}
}
/**
* When system exit,this method will be called to close resources
*
* @param timeout
* @param timeUnit
* @return
*/
public boolean stop(long timeout, TimeUnit timeUnit) {
this.setStopping();
try {
ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup);
} catch (Exception e) {
ErrorHandler handler = this.getHandler();
if (handler != null) {
handler.error("Closeing RocketmqLog4j2Appender [" + this.getName()
+ "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
}
}
boolean stopped = super.stop(timeout, timeUnit, false);
this.setStopped();
return stopped;
}
/**
* Log4j2 builder creator
*/
@PluginBuilderFactory
public static RocketmqLog4j2Appender.Builder newBuilder() {
return new RocketmqLog4j2Appender.Builder();
}
/**
* Log4j2 xml builder define
*/
public static class Builder implements org.apache.logging.log4j.core.util.Builder<RocketmqLog4j2Appender> {
@PluginBuilderAttribute
@Required(message = "A name for the RocketmqLog4j2Appender must be specified")
private String name;
@PluginElement("Layout")
private Layout<? extends Serializable> layout;
@PluginElement("Filter")
private Filter filter;
@PluginBuilderAttribute
private boolean ignoreExceptions;
@PluginBuilderAttribute
private String tag;
@PluginBuilderAttribute
private String nameServerAddress;
@PluginBuilderAttribute
private String producerGroup;
@PluginBuilderAttribute
@Required(message = "A topic name must be specified")
private String topic;
private Builder() {
this.layout = SerializedLayout.createLayout();
this.ignoreExceptions = true;
}
public RocketmqLog4j2Appender.Builder setName(String name) {
this.name = name;
return this;
}
public RocketmqLog4j2Appender.Builder setLayout(Layout<? extends Serializable> layout) {
this.layout = layout;
return this;
}
public RocketmqLog4j2Appender.Builder setFilter(Filter filter) {
this.filter = filter;
return this;
}
public RocketmqLog4j2Appender.Builder setIgnoreExceptions(boolean ignoreExceptions) {
this.ignoreExceptions = ignoreExceptions;
return this;
}
public RocketmqLog4j2Appender.Builder setTag(final String tag) {
this.tag = tag;
return this;
}
public RocketmqLog4j2Appender.Builder setTopic(final String topic) {
this.topic = topic;
return this;
}
public RocketmqLog4j2Appender.Builder setNameServerAddress(String nameServerAddress) {
this.nameServerAddress = nameServerAddress;
return this;
}
public RocketmqLog4j2Appender.Builder setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
return this;
}
public RocketmqLog4j2Appender build() {
return new RocketmqLog4j2Appender(name, filter, layout, ignoreExceptions,
nameServerAddress, producerGroup, topic, tag);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.logappender.logback;
import ch.qos.logback.classic.net.LoggingEventPreSerializationTransformer;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import ch.qos.logback.core.Layout;
import ch.qos.logback.core.spi.PreSerializationTransformer;
import ch.qos.logback.core.status.ErrorStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logappender.common.ProducerInstance;
import org.apache.rocketmq.client.producer.MQProducer;
/**
* Logback Appender Component
*/
public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> {
/**
* Message tag define
*/
private String tag;
/**
* Whitch topic to send log messages
*/
private String topic;
/**
* RocketMQ nameserver address
*/
private String nameServerAddress;
/**
* Log producer group
*/
private String producerGroup;
/**
* Log producer send instance
*/
private MQProducer producer;
private Layout layout;
private PreSerializationTransformer<ILoggingEvent> pst = new LoggingEventPreSerializationTransformer();
/**
* Info,error,warn,callback method implementation
*
* @param event
*/
@Override
protected void append(ILoggingEvent event) {
if (!isStarted()) {
return;
}
String logStr = this.layout.doLayout(event);
try {
Message msg = new Message(topic, tag, logStr.getBytes());
msg.getProperties().put(ProducerInstance.APPENDER_TYPE, ProducerInstance.LOGBACK_APPENDER);
//Send message and do not wait for the ack from the message broker.
producer.sendOneway(msg);
} catch (Exception e) {
addError("Could not send message in RocketmqLogbackAppender [" + name + "]. Message is : " + logStr, e);
}
}
/**
* Options are activated and become effective only after calling this method.
*/
public void start() {
int errors = 0;
if (this.layout == null) {
addStatus(new ErrorStatus("No layout set for the RocketmqLogbackAppender named \"" + name + "\".", this));
errors++;
}
if (errors > 0 || !checkEntryConditions()) {
return;
}
try {
producer = ProducerInstance.getInstance(nameServerAddress, producerGroup);
} catch (Exception e) {
addError("Starting RocketmqLogbackAppender [" + this.getName()
+ "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
}
if (producer != null) {
super.start();
}
}
/**
* When system exit,this method will be called to close resources
*/
public synchronized void stop() {
// The synchronized modifier avoids concurrent append and close operations
if (!this.started) {
return;
}
this.started = false;
try {
ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup);
} catch (Exception e) {
addError("Closeing RocketmqLogbackAppender [" + this.getName()
+ "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
}
// Help garbage collection
producer = null;
}
protected boolean checkEntryConditions() {
String fail = null;
if (this.topic == null) {
fail = "No topic";
}
if (fail != null) {
addError(fail + " for RocketmqLogbackAppender named [" + name + "].");
return false;
} else {
return true;
}
}
public Layout getLayout() {
return this.layout;
}
/**
* Set the pattern layout to format the log.
*/
public void setLayout(Layout layout) {
this.layout = layout;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public void setNameServerAddress(String nameServerAddress) {
this.nameServerAddress = nameServerAddress;
}
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.logappender;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.logappender.common.ProducerInstance;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Basic test rocketmq broker and name server init
*/
public class AbstractTestCase {
private static String nameServer = "localhost:9876";
private static NamesrvController namesrvController;
private static BrokerController brokerController;
private static String topic = "TopicTest";
@BeforeClass
public static void startRocketmqService() throws Exception {
startNamesrv();
startBroker();
}
/**
* Start rocketmq name server
* @throws Exception
*/
private static void startNamesrv() throws Exception {
NamesrvConfig namesrvConfig = new NamesrvConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
boolean initResult = namesrvController.initialize();
if (!initResult) {
namesrvController.shutdown();
throw new Exception();
}
namesrvController.start();
}
/**
* Start rocketmq broker service
* @throws Exception
*/
private static void startBroker() throws Exception {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setNamesrvAddr(nameServer);
brokerConfig.setBrokerId(MixAll.MASTER_ID);
NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(10911);
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
boolean initResult = brokerController.initialize();
if (!initResult) {
brokerController.shutdown();
throw new Exception();
}
brokerController.start();
}
@AfterClass
public static void stop() {
ProducerInstance.closeAll();
if (brokerController != null) {
brokerController.shutdown();
}
if (namesrvController != null) {
namesrvController.shutdown();
}
}
protected int consumeMessages(int count,final String key,int timeout) throws MQClientException, InterruptedException {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(count);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello");
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody());
if(key==null||body.contains(key)){
countDownLatch.countDown();
cc.incrementAndGet();
continue;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
countDownLatch.await(timeout, TimeUnit.SECONDS);
consumer.shutdown();
return cc.get();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.logappender;
import org.apache.log4j.PropertyConfigurator;
public class Log4jPropertiesTest extends Log4jTest {
@Override
public void init() {
PropertyConfigurator.configure("src/test/resources/log4j-example.properties");
}
@Override
public String getType() {
return "properties";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.logappender;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.exception.MQClientException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public abstract class Log4jTest extends AbstractTestCase{
@Before
public abstract void init();
public abstract String getType();
@Test
public void testLog4j() throws InterruptedException, MQClientException {
Logger logger = Logger.getLogger("testLogger");
for (int i = 0; i < 50; i++) {
logger.info("log4j " + this.getType() + " simple test message " + i);
}
int received = consumeMessages(30, "log4j",30);
Assert.assertTrue(received>20);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.logappender;
import org.apache.log4j.xml.DOMConfigurator;
public class Log4jXmlTest extends Log4jTest {
@Override
public void init() {
DOMConfigurator.configure("src/test/resources/log4j-example.xml");
}
@Override
public String getType() {
return "xml";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.logappender;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import ch.qos.logback.core.util.StatusPrinter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
public class LogbackTest extends AbstractTestCase{
@Before
public void init() throws JoranException {
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(new File("src/test/resources/logback-example.xml"));
StatusPrinter.printInCaseOfErrorsOrWarnings(lc);
}
@Test
public void testLogback() throws InterruptedException, MQClientException {
Logger logger = LoggerFactory.getLogger("testLogger");
for (int i = 0; i < 50; i++) {
logger.info("logback test message " + i);
}
int received = consumeMessages(30, "logback",30);
Assert.assertTrue(received>20);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.logappender;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.rocketmq.client.exception.MQClientException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class log4j2Test extends AbstractTestCase{
@Before
public void init() {
Configurator.initialize("log4j2", "src/test/resources/log4j2-example.xml");
}
@Test
public void testLog4j2() throws InterruptedException, MQClientException {
Logger logger = LogManager.getLogger("test");
for (int i = 0; i < 50; i++) {
logger.info("log4j2 log message " + i);
}
int received = consumeMessages(30, "log4j2",30);
Assert.assertTrue(received>20);
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO,stdout
log4j.logger.testLogger=INFO,mq
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %-4r [%t] (%F:%L) %-5p - %m%n
log4j.appender.store=org.apache.log4j.DailyRollingFileAppender
log4j.appender.store.File=${user.home}/logs/rocketmqlogs/appender.log
log4j.appender.store.Append=true
log4j.appender.store.DatePattern ='_'yyyy-MM-dd'.log'
log4j.appender.store.layout=org.apache.log4j.PatternLayout
log4j.appender.store.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n
log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=log
log4j.appender.mq.Topic=TopicTest
log4j.appender.mq.ProducerGroup=log4jp
log4j.appender.mq.NameServerAddress=127.0.0.1:9876
log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="consoleAppender" class="org.apache.log4j.ConsoleAppender">
<param name="Encoding" value="UTF-8" />
<param name="Target" value="System.out" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss},%d %-4r [%t] (%F:%L) %-5p - %m%n" />
</layout>
</appender>
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender">
<param name="Tag" value="log1" />
<param name="Topic" value="TopicTest" />
<param name="ProducerGroup" value="log4jxml" />
<param name="NameServerAddress" value="127.0.0.1:9876"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" />
</layout>
</appender>
<appender name="mqAsyncAppender1" class="org.apache.log4j.AsyncAppender">
<param name="BufferSize" value="1024" />
<param name="Blocking" value="false" />
<appender-ref ref="mqAppender1"/>
</appender>
<logger name="testLogger" additivity="false">
<level value="INFO" />
<appender-ref ref="mqAsyncAppender1" />
<appender-ref ref="consoleAppender" />
</logger>
<logger name="consoleLogger" additivity="false">
<level value="INFO" />
<appender-ref ref="consoleAppender" />
</logger>
<root>
<level value="INFO" />
<appender-ref ref="consoleAppender"/>
</root>
</log4j:configuration>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<Configuration status="warn" name="Rocketmq">
<Appenders>
<RocketMQ name="rocketmqAppender" producerGroup="log4j2" nameServerAddress="127.0.0.1:9876"
topic="TopicTest" tag="log">
<PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
</RocketMQ>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Logger name="rocketmqLogger" level="info">
<AppenderRef ref="rocketmqAppender"/>
</Logger>
<Root level="debug">
<AppenderRef ref="Console"/>
<AppenderRef ref="rocketmqAppender"/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<appender name="system" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}/logs/simple/system.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${user.home}/logs/simple/system.%i.log
</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>30</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%date %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<appender name="consoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<encoder>
<pattern>%date %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<appender name="dailyAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}/logs/simple/daily.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${user.home}/logs/simple/daily.log.%d{yyyy-MM-dd_HH}</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%date %p %t - %m%n</pattern>
</encoder>
</appender>
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
<tag>log1</tag>
<topic>TopicTest</topic>
<producerGroup>logback</producerGroup>
<nameServerAddress>127.0.0.1:9876</nameServerAddress>
<layout>
<pattern>%date %p %t - %m%n</pattern>
</layout>
</appender>
<appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>1024</queueSize>
<discardingThreshold>80</discardingThreshold>
<maxFlushTime>2000</maxFlushTime>
<neverBlock>true</neverBlock>
<appender-ref ref="mqAppender1"/>
</appender>
<root>
<level value="debug"/>
<appender-ref ref="consoleAppender"/>
</root>
<logger name="systemLogger" level="debug" additivity="false">
<appender-ref ref="system"/>
</logger>
<logger name="testLogger" level="debug" additivity="false">
<appender-ref ref="mqAsyncAppender1"/>
<appender-ref ref="consoleAppender"/>
</logger>
</configuration>
......@@ -175,6 +175,7 @@
<module>store</module>
<module>namesrv</module>
<module>remoting</module>
<module>logappender</module>
<module>example</module>
<module>filtersrv</module>
<module>srvutil</module>
......@@ -623,6 +624,16 @@
<artifactId>openmessaging-api</artifactId>
<version>0.1.0-alpha</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.7</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
......@@ -30,6 +30,12 @@
<!-- header -->
<module name="RegexpHeader">
<property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/>
<property name="fileExtensions" value="java"/>
</module>
<module name="RegexpHeader">
<property name="header" value="#[\s]*Licensed to the Apache Software Foundation*"/>
<property name="fileExtensions" value="properties"/>
</module>
<module name="RegexpSingleline">
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册