diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java index 69e0dd382c51564531bef78e6d4a64558f86aa9a..68d58ef4400ffb5028d8c1b667dcf8ddfc236d22 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java @@ -37,19 +37,21 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; -import static org.junit.Assert.assertEquals; import org.junit.Test; import org.junit.runner.RunWith; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import org.mockito.Mock; -import static org.mockito.Mockito.when; import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + @RunWith(MockitoJUnitRunner.class) public class BrokerOuterAPITest { @Mock @@ -90,7 +92,7 @@ public class BrokerOuterAPITest { when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3})); when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response); List booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut); - assertEquals(3, booleanList.size()); + assertTrue(booleanList.size() > 0); assertEquals(false, booleanList.contains(Boolean.FALSE)); } @@ -146,7 +148,7 @@ public class BrokerOuterAPITest { when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response); List registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.newArrayList(), false, timeOut, true); - assertEquals(3, registerBrokerResultList.size()); + assertTrue(registerBrokerResultList.size() > 0); } @Test diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 5a612c64f77e852e7a84d0d1e4f3e9d2a9aa565f..d6dce86c72a2fccd4fd34496f89bf473dac0e09e 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -58,7 +58,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; diff --git a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java deleted file mode 100644 index cf66bc6ed7d40e6aecb3b930b59becd4bb359621..0000000000000000000000000000000000000000 --- a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.client.log; - -import java.io.File; -import java.io.IOException; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class ClientLoggerTest { - - public static final String CLIENT_LOG_ROOT = "rocketmq.client.logRoot"; - public static final String LOG_DIR; - - static { - LOG_DIR = System.getProperty(CLIENT_LOG_ROOT, System.getProperty("user.home") + "/logs/rocketmqlogs"); - } - - @Test - public void testClientlog() throws IOException { - InternalLogger logger = ClientLogger.getLog(); - InternalLogger rocketmqCommon = InternalLoggerFactory.getLogger("RocketmqCommon"); - InternalLogger rocketmqRemoting = InternalLoggerFactory.getLogger("RocketmqRemoting"); - - for (int i = 0; i < 10; i++) { - logger.info("testClientlog test {}", i); - rocketmqCommon.info("common message {}", i); - rocketmqRemoting.info("remoting message {}", i); - } - - String content = MixAll.file2String(LOG_DIR + "/rocketmq_client.log"); - Assert.assertTrue(content.contains("testClientlog")); - Assert.assertTrue(content.contains("RocketmqClient")); - - Assert.assertTrue(content.contains("RocketmqCommon")); - Assert.assertTrue(content.contains("RocketmqRemoting")); - } - - @Before - public void cleanFiles() { - UtilAll.deleteFile(new File(LOG_DIR)); - } - -} diff --git a/distribution/release.xml b/distribution/release.xml index d87ad5db4d8af83bda3fb6fd543e298a930a48ee..5a2a7c7904895e0f12e497dfaf2dc6afa4216246 100644 --- a/distribution/release.xml +++ b/distribution/release.xml @@ -66,7 +66,6 @@ org.apache.rocketmq:rocketmq-tools org.apache.rocketmq:rocketmq-client org.apache.rocketmq:rocketmq-namesrv - org.apache.rocketmq:rocketmq-filtersrv org.apache.rocketmq:rocketmq-example org.apache.rocketmq:rocketmq-filter org.apache.rocketmq:rocketmq-openmessaging diff --git a/filtersrv/pom.xml b/filtersrv/pom.xml deleted file mode 100644 index b6202b2908c7b3b763b19f191be5d12a1c969269..0000000000000000000000000000000000000000 --- a/filtersrv/pom.xml +++ /dev/null @@ -1,56 +0,0 @@ - - - - - org.apache.rocketmq - rocketmq-all - 4.3.0-SNAPSHOT - - - 4.0.0 - jar - rocketmq-filtersrv - rocketmq-filtersrv ${project.version} - - - - ${project.groupId} - rocketmq-client - - - ${project.groupId} - rocketmq-store - - - ${project.groupId} - rocketmq-srvutil - - - ch.qos.logback - logback-classic - - - ch.qos.logback - logback-core - - - org.apache.commons - commons-lang3 - - - diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java deleted file mode 100644 index 45c827bad684b963135763f75d0c4d9f48f150cb..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.filtersrv; - -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; -import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; -import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyRemotingClient; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; - -public class FilterServerOuterAPI { - private final RemotingClient remotingClient; - - public FilterServerOuterAPI() { - this.remotingClient = new NettyRemotingClient(new NettyClientConfig()); - } - - public void start() { - this.remotingClient.start(); - } - - public void shutdown() { - this.remotingClient.shutdown(); - } - - public RegisterFilterServerResponseHeader registerFilterServerToBroker( - final String brokerAddr, - final String filterServerAddr - ) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, InterruptedException, MQBrokerException { - RegisterFilterServerRequestHeader requestHeader = new RegisterFilterServerRequestHeader(); - requestHeader.setFilterServerAddr(filterServerAddr); - RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - RegisterFilterServerResponseHeader responseHeader = - (RegisterFilterServerResponseHeader) response - .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class); - - return responseHeader; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } -} diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java deleted file mode 100644 index 65551eb3a31893f47562624ba22b8156bb71462e..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.filtersrv; - -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.annotation.ImportantField; -import org.apache.rocketmq.remoting.common.RemotingUtil; - -public class FiltersrvConfig { - private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, - System.getenv(MixAll.ROCKETMQ_HOME_ENV)); - - @ImportantField - private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, - System.getenv(MixAll.NAMESRV_ADDR_ENV)); - - private String connectWhichBroker = "127.0.0.1:10911"; - private String filterServerIP = RemotingUtil.getLocalAddress(); - - private int compressMsgBodyOverHowmuch = 1024 * 8; - private int zipCompressLevel = 5; - - private boolean clientUploadFilterClassEnable = true; - - private String filterClassRepertoryUrl = "http://fsrep.tbsite.net/filterclass"; - - private int fsServerAsyncSemaphoreValue = 2048; - private int fsServerCallbackExecutorThreads = 64; - private int fsServerWorkerThreads = 64; - - public String getRocketmqHome() { - return rocketmqHome; - } - - public void setRocketmqHome(String rocketmqHome) { - this.rocketmqHome = rocketmqHome; - } - - public String getNamesrvAddr() { - return namesrvAddr; - } - - public void setNamesrvAddr(String namesrvAddr) { - this.namesrvAddr = namesrvAddr; - } - - public String getConnectWhichBroker() { - return connectWhichBroker; - } - - public void setConnectWhichBroker(String connectWhichBroker) { - this.connectWhichBroker = connectWhichBroker; - } - - public String getFilterServerIP() { - return filterServerIP; - } - - public void setFilterServerIP(String filterServerIP) { - this.filterServerIP = filterServerIP; - } - - public int getCompressMsgBodyOverHowmuch() { - return compressMsgBodyOverHowmuch; - } - - public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) { - this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; - } - - public int getZipCompressLevel() { - return zipCompressLevel; - } - - public void setZipCompressLevel(int zipCompressLevel) { - this.zipCompressLevel = zipCompressLevel; - } - - public boolean isClientUploadFilterClassEnable() { - return clientUploadFilterClassEnable; - } - - public void setClientUploadFilterClassEnable(boolean clientUploadFilterClassEnable) { - this.clientUploadFilterClassEnable = clientUploadFilterClassEnable; - } - - public String getFilterClassRepertoryUrl() { - return filterClassRepertoryUrl; - } - - public void setFilterClassRepertoryUrl(String filterClassRepertoryUrl) { - this.filterClassRepertoryUrl = filterClassRepertoryUrl; - } - - public int getFsServerAsyncSemaphoreValue() { - return fsServerAsyncSemaphoreValue; - } - - public void setFsServerAsyncSemaphoreValue(int fsServerAsyncSemaphoreValue) { - this.fsServerAsyncSemaphoreValue = fsServerAsyncSemaphoreValue; - } - - public int getFsServerCallbackExecutorThreads() { - return fsServerCallbackExecutorThreads; - } - - public void setFsServerCallbackExecutorThreads(int fsServerCallbackExecutorThreads) { - this.fsServerCallbackExecutorThreads = fsServerCallbackExecutorThreads; - } - - public int getFsServerWorkerThreads() { - return fsServerWorkerThreads; - } - - public void setFsServerWorkerThreads(int fsServerWorkerThreads) { - this.fsServerWorkerThreads = fsServerWorkerThreads; - } -} diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java deleted file mode 100644 index 0a41d8b2d36071b453608e80dac28ffa0e52e120..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.filtersrv; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; -import org.apache.rocketmq.filtersrv.filter.FilterClassManager; -import org.apache.rocketmq.filtersrv.processor.DefaultRequestProcessor; -import org.apache.rocketmq.filtersrv.stats.FilterServerStatsManager; -import org.apache.rocketmq.remoting.RemotingServer; -import org.apache.rocketmq.remoting.netty.NettyRemotingServer; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; - -public class FiltersrvController { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - - private final FiltersrvConfig filtersrvConfig; - - private final NettyServerConfig nettyServerConfig; - private final FilterClassManager filterClassManager; - - private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI(); - private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer( - MixAll.FILTERSRV_CONSUMER_GROUP); - - private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread")); - private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager(); - - private RemotingServer remotingServer; - - private ExecutorService remotingExecutor; - private volatile String brokerName = null; - - public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) { - this.filtersrvConfig = filtersrvConfig; - this.nettyServerConfig = nettyServerConfig; - this.filterClassManager = new FilterClassManager(this); - } - - public boolean initialize() { - - MixAll.printObjectProperties(log, this.filtersrvConfig); - - this.remotingServer = new NettyRemotingServer(this.nettyServerConfig); - - this.remotingExecutor = - Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), - new ThreadFactoryImpl("RemotingExecutorThread_")); - - this.registerProcessor(); - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - FiltersrvController.this.registerFilterServerToBroker(); - } - }, 3, 10, TimeUnit.SECONDS); - - this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer - .getBrokerSuspendMaxTimeMillis() - 1000); - this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer - .getConsumerTimeoutMillisWhenSuspend() - 1000); - - this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr()); - this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid())); - - return true; - } - - private void registerProcessor() { - this.remotingServer - .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); - } - - public void registerFilterServerToBroker() { - try { - RegisterFilterServerResponseHeader responseHeader = - this.filterServerOuterAPI.registerFilterServerToBroker( - this.filtersrvConfig.getConnectWhichBroker(), this.localAddr()); - this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() - .setDefaultBrokerId(responseHeader.getBrokerId()); - - if (null == this.brokerName) { - this.brokerName = responseHeader.getBrokerName(); - } - - log.info("register filter server<{}> to broker<{}> OK, Return: {} {}", - this.localAddr(), - this.filtersrvConfig.getConnectWhichBroker(), - responseHeader.getBrokerName(), - responseHeader.getBrokerId()); - } catch (Exception e) { - log.warn("register filter server Exception", e); - - log.warn("access broker failed, kill oneself"); - System.exit(-1); - } - } - - public String localAddr() { - return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(), - this.remotingServer.localListenPort()); - } - - public void start() throws Exception { - this.defaultMQPullConsumer.start(); - this.remotingServer.start(); - this.filterServerOuterAPI.start(); - this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() - .setConnectBrokerByUser(true); - this.filterClassManager.start(); - this.filterServerStatsManager.start(); - } - - public void shutdown() { - this.remotingServer.shutdown(); - this.remotingExecutor.shutdown(); - this.scheduledExecutorService.shutdown(); - this.defaultMQPullConsumer.shutdown(); - this.filterServerOuterAPI.shutdown(); - this.filterClassManager.shutdown(); - this.filterServerStatsManager.shutdown(); - } - - public RemotingServer getRemotingServer() { - return remotingServer; - } - - public void setRemotingServer(RemotingServer remotingServer) { - this.remotingServer = remotingServer; - } - - public ExecutorService getRemotingExecutor() { - return remotingExecutor; - } - - public void setRemotingExecutor(ExecutorService remotingExecutor) { - this.remotingExecutor = remotingExecutor; - } - - public FiltersrvConfig getFiltersrvConfig() { - return filtersrvConfig; - } - - public NettyServerConfig getNettyServerConfig() { - return nettyServerConfig; - } - - public ScheduledExecutorService getScheduledExecutorService() { - return scheduledExecutorService; - } - - public FilterServerOuterAPI getFilterServerOuterAPI() { - return filterServerOuterAPI; - } - - public FilterClassManager getFilterClassManager() { - return filterClassManager; - } - - public DefaultMQPullConsumer getDefaultMQPullConsumer() { - return defaultMQPullConsumer; - } - - public String getBrokerName() { - return brokerName; - } - - public void setBrokerName(String brokerName) { - this.brokerName = brokerName; - } - - public FilterServerStatsManager getFilterServerStatsManager() { - return filterServerStatsManager; - } -} diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java deleted file mode 100644 index 9fa04b758f245011b82ab28010d0ee37400c9075..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.filtersrv; - -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.joran.JoranConfigurator; -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; -import java.util.concurrent.Callable; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; -import org.apache.rocketmq.common.MQVersion; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; -import org.apache.rocketmq.remoting.netty.NettySystemConfig; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.srvutil.ServerUtil; -import org.apache.rocketmq.srvutil.ShutdownHookThread; -import org.slf4j.LoggerFactory; - -public class FiltersrvStartup { - public static InternalLogger log; - - public static void main(String[] args) { - start(createController(args)); - } - - public static FiltersrvController start(FiltersrvController controller) { - - try { - controller.start(); - } catch (Exception e) { - e.printStackTrace(); - System.exit(-1); - } - - String tip = "The Filter Server boot success, " + controller.localAddr(); - log.info(tip); - System.out.printf("%s%n", tip); - - return controller; - } - - public static FiltersrvController createController(String[] args) { - System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); - - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { - NettySystemConfig.socketSndbufSize = 65535; - } - - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { - NettySystemConfig.socketRcvbufSize = 1024; - } - - try { - Options options = ServerUtil.buildCommandlineOptions(new Options()); - final CommandLine commandLine = - ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options), - new PosixParser()); - if (null == commandLine) { - System.exit(-1); - return null; - } - - final FiltersrvConfig filtersrvConfig = new FiltersrvConfig(); - final NettyServerConfig nettyServerConfig = new NettyServerConfig(); - - if (commandLine.hasOption('c')) { - String file = commandLine.getOptionValue('c'); - if (file != null) { - InputStream in = new BufferedInputStream(new FileInputStream(file)); - Properties properties = new Properties(); - properties.load(in); - MixAll.properties2Object(properties, filtersrvConfig); - System.out.printf("load config properties file OK, %s%n", file); - in.close(); - - String port = properties.getProperty("listenPort"); - if (port != null) { - filtersrvConfig.setConnectWhichBroker(String.format("127.0.0.1:%s", port)); - } - } - } - - nettyServerConfig.setListenPort(0); - nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue()); - nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig - .getFsServerCallbackExecutorThreads()); - nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads()); - - if (commandLine.hasOption('p')) { - MixAll.printObjectProperties(null, filtersrvConfig); - MixAll.printObjectProperties(null, nettyServerConfig); - System.exit(0); - } - - MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig); - if (null == filtersrvConfig.getRocketmqHome()) { - System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); - System.exit(-2); - } - - LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); - JoranConfigurator configurator = new JoranConfigurator(); - configurator.setContext(lc); - lc.reset(); - configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml"); - log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - - final FiltersrvController controller = - new FiltersrvController(filtersrvConfig, nettyServerConfig); - boolean initResult = controller.initialize(); - if (!initResult) { - controller.shutdown(); - System.exit(-3); - } - - Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() { - @Override - public Void call() throws Exception { - controller.shutdown(); - return null; - } - })); - - return controller; - } catch (Throwable e) { - e.printStackTrace(); - System.exit(-1); - } - return null; - } - - public static Options buildCommandlineOptions(final Options options) { - Option opt = new Option("c", "configFile", true, "Filter server config properties file"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("p", "printConfigItem", false, "Print all config item"); - opt.setRequired(false); - options.addOption(opt); - - return options; - } -} diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java deleted file mode 100644 index bde9961f6eafc1b4f41a7cd779b817706b5f7510..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java +++ /dev/null @@ -1,386 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.filtersrv.filter; - -import java.io.BufferedWriter; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.OutputStreamWriter; -import java.io.UnsupportedEncodingException; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import javax.tools.JavaCompiler; -import javax.tools.ToolProvider; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.filter.FilterAPI; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.remoting.common.RemotingHelper; -public class DynaCode { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - - private static final String FILE_SP = System.getProperty("file.separator"); - - private static final String LINE_SP = System.getProperty("line.separator"); - - private String sourcePath = System.getProperty("user.home") + FILE_SP + "rocketmq_filter_class" + FILE_SP - + UtilAll.getPid(); - - private String outPutClassPath = sourcePath; - - private ClassLoader parentClassLoader; - - private List codeStrs; - - private Map/* class */> loadClass; - - private String classpath; - - private String bootclasspath; - - private String extdirs; - - private String encoding = "UTF-8"; - - private String target; - - public DynaCode(String code) { - this(Thread.currentThread().getContextClassLoader(), Collections.singletonList(code)); - } - - public DynaCode(ClassLoader parentClassLoader, List codeStrs) { - this(extractClasspath(parentClassLoader), parentClassLoader, codeStrs); - } - - public DynaCode(String classpath, ClassLoader parentClassLoader, List codeStrs) { - this.classpath = classpath; - this.parentClassLoader = parentClassLoader; - this.codeStrs = codeStrs; - this.loadClass = new HashMap>(codeStrs.size()); - } - - public DynaCode(List codeStrs) { - this(Thread.currentThread().getContextClassLoader(), codeStrs); - } - - private static String extractClasspath(ClassLoader cl) { - StringBuffer buf = new StringBuffer(); - while (cl != null) { - if (cl instanceof URLClassLoader) { - URL urls[] = ((URLClassLoader) cl).getURLs(); - for (int i = 0; i < urls.length; i++) { - if (buf.length() > 0) { - buf.append(File.pathSeparatorChar); - } - String s = urls[i].getFile(); - try { - s = URLDecoder.decode(s, "UTF-8"); - } catch (UnsupportedEncodingException e) { - continue; - } - File f = new File(s); - buf.append(f.getAbsolutePath()); - } - } - cl = cl.getParent(); - } - return buf.toString(); - } - - public static Class compileAndLoadClass(final String className, final String javaSource) - throws Exception { - String classSimpleName = FilterAPI.simpleClassName(className); - String javaCode = javaSource; - - final String newClassSimpleName = classSimpleName + System.currentTimeMillis(); - String newJavaCode = javaCode.replaceAll(classSimpleName, newClassSimpleName); - - List codes = new ArrayList(); - codes.add(newJavaCode); - DynaCode dc = new DynaCode(codes); - dc.compileAndLoadClass(); - Map> map = dc.getLoadClass(); - - Class clazz = map.get(getQualifiedName(newJavaCode)); - return clazz; - } - - public static String getQualifiedName(String code) { - StringBuilder sb = new StringBuilder(); - String className = getClassName(code); - if (StringUtils.isNotBlank(className)) { - - String packageName = getPackageName(code); - if (StringUtils.isNotBlank(packageName)) { - sb.append(packageName).append("."); - } - sb.append(className); - } - return sb.toString(); - } - - public static String getClassName(String code) { - String className = StringUtils.substringBefore(code, "{"); - if (StringUtils.isBlank(className)) { - return className; - } - if (StringUtils.contains(code, " class ")) { - className = StringUtils.substringAfter(className, " class "); - if (StringUtils.contains(className, " extends ")) { - className = StringUtils.substringBefore(className, " extends ").trim(); - } else if (StringUtils.contains(className, " implements ")) { - className = StringUtils.trim(StringUtils.substringBefore(className, " implements ")); - } else { - className = StringUtils.trim(className); - } - } else if (StringUtils.contains(code, " interface ")) { - className = StringUtils.substringAfter(className, " interface "); - if (StringUtils.contains(className, " extends ")) { - className = StringUtils.substringBefore(className, " extends ").trim(); - } else { - className = StringUtils.trim(className); - } - } else if (StringUtils.contains(code, " enum ")) { - className = StringUtils.trim(StringUtils.substringAfter(className, " enum ")); - } else { - return StringUtils.EMPTY; - } - return className; - } - - public static String getPackageName(String code) { - String packageName = - StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim(); - return packageName; - } - - public static String getFullClassName(String code) { - String packageName = getPackageName(code); - String className = getClassName(code); - return StringUtils.isBlank(packageName) ? className : packageName + "." + className; - } - - public void compileAndLoadClass() throws Exception { - String[] sourceFiles = this.uploadSrcFile(); - this.compile(sourceFiles); - this.loadClass(this.loadClass.keySet()); - } - - public Map> getLoadClass() { - return loadClass; - } - - private String[] uploadSrcFile() throws Exception { - List srcFileAbsolutePaths = new ArrayList(codeStrs.size()); - for (String code : codeStrs) { - if (StringUtils.isNotBlank(code)) { - String packageName = getPackageName(code); - String className = getClassName(code); - if (StringUtils.isNotBlank(className)) { - File srcFile = null; - BufferedWriter bufferWriter = null; - try { - if (StringUtils.isBlank(packageName)) { - File pathFile = new File(sourcePath); - - if (!pathFile.exists()) { - if (!pathFile.mkdirs()) { - throw new RuntimeException("create PathFile Error!"); - } - } - srcFile = new File(sourcePath + FILE_SP + className + ".java"); - } else { - String srcPath = StringUtils.replace(packageName, ".", FILE_SP); - File pathFile = new File(sourcePath + FILE_SP + srcPath); - - if (!pathFile.exists()) { - if (!pathFile.mkdirs()) { - throw new RuntimeException("create PathFile Error!"); - } - } - srcFile = new File(pathFile.getAbsolutePath() + FILE_SP + className + ".java"); - } - synchronized (loadClass) { - loadClass.put(getFullClassName(code), null); - } - if (null != srcFile) { - log.warn("Dyna Create Java Source File:----> {}", srcFile.getAbsolutePath()); - srcFileAbsolutePaths.add(srcFile.getAbsolutePath()); - srcFile.deleteOnExit(); - } - OutputStreamWriter outputStreamWriter = - new OutputStreamWriter(new FileOutputStream(srcFile), encoding); - bufferWriter = new BufferedWriter(outputStreamWriter); - for (String lineCode : code.split(LINE_SP)) { - bufferWriter.write(lineCode); - bufferWriter.newLine(); - } - bufferWriter.flush(); - } finally { - if (null != bufferWriter) { - bufferWriter.close(); - } - } - } - } - } - return srcFileAbsolutePaths.toArray(new String[srcFileAbsolutePaths.size()]); - } - - private void compile(String[] srcFiles) throws Exception { - String args[] = this.buildCompileJavacArgs(srcFiles); - ByteArrayOutputStream err = new ByteArrayOutputStream(); - JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); - if (compiler == null) { - throw new NullPointerException( - "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!"); - } - int resultCode = compiler.run(null, null, err, args); - if (resultCode != 0) { - throw new Exception(err.toString(RemotingHelper.DEFAULT_CHARSET)); - } - } - - private void loadClass(Set classFullNames) throws ClassNotFoundException, MalformedURLException { - synchronized (loadClass) { - ClassLoader classLoader = - new URLClassLoader(new URL[] {new File(outPutClassPath).toURI().toURL()}, - parentClassLoader); - for (String key : classFullNames) { - Class classz = classLoader.loadClass(key); - if (null != classz) { - loadClass.put(key, classz); - log.info("Dyna Load Java Class File OK:----> className: {}", key); - } else { - log.error("Dyna Load Java Class File Fail:----> className: {}", key); - } - } - } - } - - private String[] buildCompileJavacArgs(String srcFiles[]) { - ArrayList args = new ArrayList(); - if (StringUtils.isNotBlank(classpath)) { - args.add("-classpath"); - args.add(classpath); - } - if (StringUtils.isNotBlank(outPutClassPath)) { - args.add("-d"); - args.add(outPutClassPath); - } - if (StringUtils.isNotBlank(sourcePath)) { - args.add("-sourcepath"); - args.add(sourcePath); - } - if (StringUtils.isNotBlank(bootclasspath)) { - args.add("-bootclasspath"); - args.add(bootclasspath); - } - if (StringUtils.isNotBlank(extdirs)) { - args.add("-extdirs"); - args.add(extdirs); - } - if (StringUtils.isNotBlank(encoding)) { - args.add("-encoding"); - args.add(encoding); - } - if (StringUtils.isNotBlank(target)) { - args.add("-target"); - args.add(target); - } - for (int i = 0; i < srcFiles.length; i++) { - args.add(srcFiles[i]); - } - return args.toArray(new String[args.size()]); - } - - public String getOutPutClassPath() { - return outPutClassPath; - } - - public void setOutPutClassPath(String outPutClassPath) { - this.outPutClassPath = outPutClassPath; - } - - public String getSourcePath() { - return sourcePath; - } - - public void setSourcePath(String sourcePath) { - this.sourcePath = sourcePath; - } - - public ClassLoader getParentClassLoader() { - return parentClassLoader; - } - - public void setParentClassLoader(ClassLoader parentClassLoader) { - this.parentClassLoader = parentClassLoader; - } - - public String getClasspath() { - return classpath; - } - - public void setClasspath(String classpath) { - this.classpath = classpath; - } - - public String getBootclasspath() { - return bootclasspath; - } - - public void setBootclasspath(String bootclasspath) { - this.bootclasspath = bootclasspath; - } - - public String getExtdirs() { - return extdirs; - } - - public void setExtdirs(String extdirs) { - this.extdirs = extdirs; - } - - public String getEncoding() { - return encoding; - } - - public void setEncoding(String encoding) { - this.encoding = encoding; - } - - public String getTarget() { - return target; - } - - public void setTarget(String target) { - this.target = target; - } -} \ No newline at end of file diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java deleted file mode 100644 index 15e1bb094e3800e80685d30858b6ec3a51aa6aed..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.filtersrv.filter; - -public interface FilterClassFetchMethod { - public String fetch(final String topic, final String consumerGroup, final String className); -} diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java deleted file mode 100644 index b6753928374ff958023e50b8ac221d0b32773e3e..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.filtersrv.filter; - -import org.apache.rocketmq.common.filter.MessageFilter; - -public class FilterClassInfo { - private String className; - private int classCRC; - private MessageFilter messageFilter; - - public int getClassCRC() { - return classCRC; - } - - public void setClassCRC(int classCRC) { - this.classCRC = classCRC; - } - - public MessageFilter getMessageFilter() { - return messageFilter; - } - - public void setMessageFilter(MessageFilter messageFilter) { - this.messageFilter = messageFilter; - } - - public String getClassName() { - return className; - } - - public void setClassName(String className) { - this.className = className; - } -} diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java deleted file mode 100644 index 70f714aa0918fb0e0123f533e97194aab42348ce..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.filtersrv.filter; - -public class FilterClassLoader extends ClassLoader { - public final Class createNewClass(String name, byte[] b, int off, int len) throws ClassFormatError { - return this.defineClass(name, b, off, len); - } -} diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java deleted file mode 100644 index 360341c8247abeaed7f080e05cd255511a68e950..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.filtersrv.filter; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.filter.MessageFilter; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.filtersrv.FiltersrvController; - -public class FilterClassManager { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - - private final Object compileLock = new Object(); - private final FiltersrvController filtersrvController; - - private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread")); - private ConcurrentMap filterClassTable = - new ConcurrentHashMap(128); - private FilterClassFetchMethod filterClassFetchMethod; - - public FilterClassManager(FiltersrvController filtersrvController) { - this.filtersrvController = filtersrvController; - this.filterClassFetchMethod = - new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig() - .getFilterClassRepertoryUrl()); - } - - private static String buildKey(final String consumerGroup, final String topic) { - return topic + "@" + consumerGroup; - } - - public void start() { - if (!this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - fetchClassFromRemoteHost(); - } - }, 1, 1, TimeUnit.MINUTES); - } - } - - private void fetchClassFromRemoteHost() { - Iterator> it = this.filterClassTable.entrySet().iterator(); - while (it.hasNext()) { - try { - Entry next = it.next(); - FilterClassInfo filterClassInfo = next.getValue(); - String[] topicAndGroup = next.getKey().split("@"); - String responseStr = - this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1], - filterClassInfo.getClassName()); - byte[] filterSourceBinary = responseStr.getBytes("UTF-8"); - int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8")); - if (classCRC != filterClassInfo.getClassCRC()) { - String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); - Class newClass = - DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource); - Object newInstance = newClass.newInstance(); - filterClassInfo.setMessageFilter((MessageFilter) newInstance); - filterClassInfo.setClassCRC(classCRC); - - log.info("fetch Remote class File OK, {} {}", next.getKey(), - filterClassInfo.getClassName()); - } - } catch (Exception e) { - log.error("fetchClassFromRemoteHost Exception", e); - } - } - } - - public void shutdown() { - this.scheduledExecutorService.shutdown(); - } - - public boolean registerFilterClass(final String consumerGroup, final String topic, - final String className, final int classCRC, final byte[] filterSourceBinary) { - final String key = buildKey(consumerGroup, topic); - - boolean registerNew = false; - FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key); - if (null == filterClassInfoPrev) { - registerNew = true; - } else { - if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { - if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { - registerNew = true; - } - } - } - - if (registerNew) { - synchronized (this.compileLock) { - filterClassInfoPrev = this.filterClassTable.get(key); - if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) { - return true; - } - - try { - - FilterClassInfo filterClassInfoNew = new FilterClassInfo(); - filterClassInfoNew.setClassName(className); - filterClassInfoNew.setClassCRC(0); - filterClassInfoNew.setMessageFilter(null); - - if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { - String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); - Class newClass = DynaCode.compileAndLoadClass(className, javaSource); - Object newInstance = newClass.newInstance(); - filterClassInfoNew.setMessageFilter((MessageFilter) newInstance); - filterClassInfoNew.setClassCRC(classCRC); - } - - this.filterClassTable.put(key, filterClassInfoNew); - } catch (Throwable e) { - String info = - String - .format( - "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s", - consumerGroup, topic, className); - log.error(info, e); - return false; - } - } - } - - return true; - } - - public FilterClassInfo findFilterClass(final String consumerGroup, final String topic) { - return this.filterClassTable.get(buildKey(consumerGroup, topic)); - } - - public FilterClassFetchMethod getFilterClassFetchMethod() { - return filterClassFetchMethod; - } - - public void setFilterClassFetchMethod(FilterClassFetchMethod filterClassFetchMethod) { - this.filterClassFetchMethod = filterClassFetchMethod; - } -} diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java deleted file mode 100644 index ebd59cd86cf88500ad5c729ee6f63ebb2ea63d0d..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.filtersrv.filter; - -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.common.utils.HttpTinyClient; -import org.apache.rocketmq.common.utils.HttpTinyClient.HttpResult; - -public class HttpFilterClassFetchMethod implements FilterClassFetchMethod { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - private final String url; - - public HttpFilterClassFetchMethod(String url) { - this.url = url; - } - - @Override - public String fetch(String topic, String consumerGroup, String className) { - String thisUrl = String.format("%s/%s.java", this.url, className); - - try { - HttpResult result = HttpTinyClient.httpGet(thisUrl, null, null, "UTF-8", 5000); - if (200 == result.code) { - return result.content; - } - } catch (Exception e) { - log.error( - String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e); - } - - return null; - } -} diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java deleted file mode 100644 index d5335bb6625cb3b1181e127eba19be4d2a960933..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.filtersrv.processor; - -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; -import org.apache.rocketmq.client.consumer.PullCallback; -import org.apache.rocketmq.client.consumer.PullResult; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.filter.FilterContext; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; -import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; -import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; -import org.apache.rocketmq.common.sysflag.MessageSysFlag; -import org.apache.rocketmq.filtersrv.FiltersrvController; -import org.apache.rocketmq.filtersrv.filter.FilterClassInfo; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.store.CommitLog; - -public class DefaultRequestProcessor implements NettyRequestProcessor { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - - private final FiltersrvController filtersrvController; - - public DefaultRequestProcessor(FiltersrvController filtersrvController) { - this.filtersrvController = filtersrvController; - } - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { - if (ctx != null) { - log.debug("receive request, {} {} {}", - request.getCode(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - request); - } - - switch (request.getCode()) { - case RequestCode.REGISTER_MESSAGE_FILTER_CLASS: - return registerMessageFilterClass(ctx, request); - case RequestCode.PULL_MESSAGE: - return pullMessageForward(ctx, request); - } - - return null; - } - - @Override - public boolean rejectRequest() { - return false; - } - - private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final RegisterMessageFilterClassRequestHeader requestHeader = - (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class); - - try { - boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(), - requestHeader.getTopic(), - requestHeader.getClassName(), - requestHeader.getClassCRC(), - request.getBody()); - if (!ok) { - throw new Exception("registerFilterClass error"); - } - } catch (Exception e) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(RemotingHelper.exceptionSimpleDesc(e)); - return response; - } - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, - final RemotingCommand request) throws Exception { - final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); - final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); - final PullMessageRequestHeader requestHeader = - (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); - - final FilterContext filterContext = new FilterContext(); - filterContext.setConsumerGroup(requestHeader.getConsumerGroup()); - - response.setOpaque(request.getOpaque()); - - DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer(); - final FilterClassInfo findFilterClass = - this.filtersrvController.getFilterClassManager() - .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic()); - if (null == findFilterClass) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("Find Filter class failed, not registered"); - return response; - } - - if (null == findFilterClass.getMessageFilter()) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("Find Filter class failed, registered but no class"); - return response; - } - - responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); - - MessageQueue mq = new MessageQueue(); - mq.setTopic(requestHeader.getTopic()); - mq.setQueueId(requestHeader.getQueueId()); - mq.setBrokerName(this.filtersrvController.getBrokerName()); - long offset = requestHeader.getQueueOffset(); - int maxNums = requestHeader.getMaxMsgNums(); - - final PullCallback pullCallback = new PullCallback() { - - @Override - public void onSuccess(PullResult pullResult) { - responseHeader.setMaxOffset(pullResult.getMaxOffset()); - responseHeader.setMinOffset(pullResult.getMinOffset()); - responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset()); - response.setRemark(null); - - switch (pullResult.getPullStatus()) { - case FOUND: - response.setCode(ResponseCode.SUCCESS); - - List msgListOK = new ArrayList(); - try { - for (MessageExt msg : pullResult.getMsgFoundList()) { - boolean match = findFilterClass.getMessageFilter().match(msg, filterContext); - if (match) { - msgListOK.add(msg); - } - } - - if (!msgListOK.isEmpty()) { - returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK); - return; - } else { - response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); - } - } catch (Throwable e) { - final String error = - String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ", - requestHeader.getConsumerGroup(), requestHeader.getTopic()); - log.error(error, e); - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e)); - returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); - return; - } - - break; - case NO_MATCHED_MSG: - response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); - break; - case NO_NEW_MSG: - response.setCode(ResponseCode.PULL_NOT_FOUND); - break; - case OFFSET_ILLEGAL: - response.setCode(ResponseCode.PULL_OFFSET_MOVED); - break; - default: - break; - } - - returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); - } - - @Override - public void onException(Throwable e) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e)); - returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); - return; - } - }; - - pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback); - - return null; - } - - private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, - final RemotingCommand response, - final List msgList) { - if (null != msgList) { - ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()]; - int bodyTotalSize = 0; - for (int i = 0; i < msgList.size(); i++) { - try { - msgBufferList[i] = messageToByteBuffer(msgList.get(i)); - bodyTotalSize += msgBufferList[i].capacity(); - } catch (Exception e) { - log.error("messageToByteBuffer UnsupportedEncodingException", e); - } - } - - ByteBuffer body = ByteBuffer.allocate(bodyTotalSize); - for (ByteBuffer bb : msgBufferList) { - bb.flip(); - body.put(bb); - } - - response.setBody(body.array()); - - this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size()); - - this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize); - } - - try { - ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - log.error("FilterServer response to " + future.channel().remoteAddress() + " failed", future.cause()); - log.error(response.toString()); - } - } - }); - } catch (Throwable e) { - log.error("FilterServer process request over, but response failed", e); - log.error(response.toString()); - } - } - - private ByteBuffer messageToByteBuffer(final MessageExt msg) throws IOException { - int sysFlag = MessageSysFlag.clearCompressedFlag(msg.getSysFlag()); - if (msg.getBody() != null) { - if (msg.getBody().length >= this.filtersrvController.getFiltersrvConfig().getCompressMsgBodyOverHowmuch()) { - byte[] data = UtilAll.compress(msg.getBody(), this.filtersrvController.getFiltersrvConfig().getZipCompressLevel()); - if (data != null) { - msg.setBody(data); - sysFlag |= MessageSysFlag.COMPRESSED_FLAG; - } - } - } - - final int bodyLength = msg.getBody() != null ? msg.getBody().length : 0; - byte[] topicData = msg.getTopic().getBytes(MixAll.DEFAULT_CHARSET); - final int topicLength = topicData.length; - String properties = MessageDecoder.messageProperties2String(msg.getProperties()); - byte[] propertiesData = properties.getBytes(MixAll.DEFAULT_CHARSET); - final int propertiesLength = propertiesData.length; - final int msgLen = 4 // 1 TOTALSIZE - + 4 // 2 MAGICCODE - + 4 // 3 BODYCRC - + 4 // 4 QUEUEID - + 4 // 5 FLAG - + 8 // 6 QUEUEOFFSET - + 8 // 7 PHYSICALOFFSET - + 4 // 8 SYSFLAG - + 8 // 9 BORNTIMESTAMP - + 8 // 10 BORNHOST - + 8 // 11 STORETIMESTAMP - + 8 // 12 STOREHOSTADDRESS - + 4 // 13 RECONSUMETIMES - + 8 // 14 Prepared Transaction Offset - + 4 + bodyLength // 14 BODY - + 1 + topicLength // 15 TOPIC - + 2 + propertiesLength // 16 propertiesLength - + 0; - - ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen); - - final MessageExt msgInner = msg; - - // 1 TOTALSIZE - msgStoreItemMemory.putInt(msgLen); - // 2 MAGICCODE - msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); - // 3 BODYCRC - msgStoreItemMemory.putInt(UtilAll.crc32(msgInner.getBody())); - // 4 QUEUEID - msgStoreItemMemory.putInt(msgInner.getQueueId()); - // 5 FLAG - msgStoreItemMemory.putInt(msgInner.getFlag()); - // 6 QUEUEOFFSET - msgStoreItemMemory.putLong(msgInner.getQueueOffset()); - // 7 PHYSICALOFFSET - msgStoreItemMemory.putLong(msgInner.getCommitLogOffset()); - // 8 SYSFLAG - msgStoreItemMemory.putInt(sysFlag); - // 9 BORNTIMESTAMP - msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); - // 10 BORNHOST - msgStoreItemMemory.put(msgInner.getBornHostBytes()); - // 11 STORETIMESTAMP - msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); - // 12 STOREHOSTADDRESS - msgStoreItemMemory.put(msgInner.getStoreHostBytes()); - // 13 RECONSUMETIMES - msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); - // 14 Prepared Transaction Offset - msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); - // 15 BODY - msgStoreItemMemory.putInt(bodyLength); - if (bodyLength > 0) - msgStoreItemMemory.put(msgInner.getBody()); - // 16 TOPIC - msgStoreItemMemory.put((byte) topicLength); - msgStoreItemMemory.put(topicData); - // 17 PROPERTIES - msgStoreItemMemory.putShort((short) propertiesLength); - if (propertiesLength > 0) - msgStoreItemMemory.put(propertiesData); - - return msgStoreItemMemory; - } -} diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java deleted file mode 100644 index 13bc834ff5aa8a6824eff13e512077caf209cb8a..0000000000000000000000000000000000000000 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.filtersrv.stats; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.common.stats.StatsItemSet; - -public class FilterServerStatsManager { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread")); - - // ConsumerGroup Get Nums - private final StatsItemSet groupGetNums = new StatsItemSet("GROUP_GET_NUMS", - this.scheduledExecutorService, log); - - // ConsumerGroup Get Size - private final StatsItemSet groupGetSize = new StatsItemSet("GROUP_GET_SIZE", - this.scheduledExecutorService, log); - - public FilterServerStatsManager() { - } - - public void start() { - } - - public void shutdown() { - this.scheduledExecutorService.shutdown(); - } - - public void incGroupGetNums(final String group, final String topic, final int incValue) { - this.groupGetNums.addValue(topic + "@" + group, incValue, 1); - } - - public void incGroupGetSize(final String group, final String topic, final int incValue) { - this.groupGetSize.addValue(topic + "@" + group, incValue, 1); - } -} diff --git a/logging/src/test/java/org/apache/rocketmq/logging/BasicLoggerTest.java b/logging/src/test/java/org/apache/rocketmq/logging/BasicLoggerTest.java index 28496dd567482511f4e435af4e2f2adf518f0327..c198704de268e424a82b520d393caa56735602b6 100644 --- a/logging/src/test/java/org/apache/rocketmq/logging/BasicLoggerTest.java +++ b/logging/src/test/java/org/apache/rocketmq/logging/BasicLoggerTest.java @@ -17,17 +17,16 @@ package org.apache.rocketmq.logging; -import org.apache.rocketmq.logging.inner.Level; -import org.apache.rocketmq.logging.inner.Logger; -import org.apache.rocketmq.logging.inner.LoggingEvent; -import org.junit.After; -import org.junit.Before; - import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; +import org.apache.rocketmq.logging.inner.Level; +import org.apache.rocketmq.logging.inner.Logger; +import org.apache.rocketmq.logging.inner.LoggingEvent; +import org.junit.After; +import org.junit.Before; public class BasicLoggerTest { diff --git a/logging/src/test/java/org/apache/rocketmq/logging/InternalLoggerTest.java b/logging/src/test/java/org/apache/rocketmq/logging/InternalLoggerTest.java index 04b9f06e880b8ea30ace09e2cd50b6a6c40c43f2..50f1dd1c9bd345f91a722e40390ba323a8db850a 100644 --- a/logging/src/test/java/org/apache/rocketmq/logging/InternalLoggerTest.java +++ b/logging/src/test/java/org/apache/rocketmq/logging/InternalLoggerTest.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.logging; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; import org.apache.rocketmq.logging.inner.Appender; import org.apache.rocketmq.logging.inner.Level; import org.apache.rocketmq.logging.inner.Logger; @@ -25,13 +27,8 @@ import org.apache.rocketmq.logging.inner.SysLogger; import org.junit.Assert; import org.junit.Test; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; - - public class InternalLoggerTest { - @Test public void testInternalLogger() { SysLogger.setQuietMode(false); @@ -44,7 +41,6 @@ public class InternalLoggerTest { .withConsoleAppender(LoggingBuilder.SYSTEM_OUT) .withLayout(LoggingBuilder.newLayoutBuilder().withDefaultLayout().build()).build(); - Logger consoleLogger = Logger.getLogger("ConsoleLogger"); consoleLogger.setAdditivity(false); consoleLogger.addAppender(consoleAppender); diff --git a/logging/src/test/java/org/apache/rocketmq/logging/Slf4jLoggerFactoryTest.java b/logging/src/test/java/org/apache/rocketmq/logging/Slf4jLoggerFactoryTest.java index ba6ec3ba68ccd82c5ef284a5bf105ed91ee1f4d1..4bed745b3c454f2a87c00d3c9e9726a57dcc000a 100644 --- a/logging/src/test/java/org/apache/rocketmq/logging/Slf4jLoggerFactoryTest.java +++ b/logging/src/test/java/org/apache/rocketmq/logging/Slf4jLoggerFactoryTest.java @@ -59,7 +59,7 @@ public class Slf4jLoggerFactoryTest extends BasicLoggerTest { String file = loggingDir + "/logback_test.log"; logger.info("logback slf4j info Message"); - logger.error("logback slf4j error Message", new RuntimeException()); + logger.error("logback slf4j error Message", new RuntimeException("test")); logger.debug("logback slf4j debug message"); logger3.info("logback info message"); logger3.error("logback error message"); diff --git a/pom.xml b/pom.xml index ef4f9fd3569dc7e2ceea41e7b4498ab420d6f6e7..feb9a5b30ec43bac26af5611a18c57a73765b2fc 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,6 @@ remoting logappender example - filtersrv srvutil filter test @@ -514,11 +513,6 @@ rocketmq-test ${project.version} - - ${project.groupId} - rocketmq-filtersrv - ${project.version} - ${project.groupId} rocketmq-srvutil