提交 9e66da4b 编写于 作者: Z zongtanghu

Merge branch 'develop_include_prs_after_4.9.1_version', remote-tracking branch...

Merge branch 'develop_include_prs_after_4.9.1_version', remote-tracking branch 'origin' into develop
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
name: Little RocketMQ
on: [pull_request_target, issues]
uses: actions/first-interaction@v1.1.0
with:
# Token for the repository. Can be passed in using {{ secrets.GITHUB_TOKEN }}
repo-token: ${{ secrets.GITHUB_TOKEN }}
# Comment to post on an individual's first issue
issue-message: 'Make sure your issue is not the existence through the issue search. Follow the issue template, make more details for us. But please be aware that Issue should not be used for FAQs: if you have a question or are simply not sure if it is really an issue or not, please contact [us](https://rocketmq.apache.org/about/contact/) first before you create a new issue.'
# Comment to post on an individual's first pull request
pr-message: 'We always welcome new contributions, whether for trivial cleanups, [big new features](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal) or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).'
......@@ -167,7 +167,7 @@ public class RemoteAddressStrategyFactory {
String[] strArray = StringUtils.split(remoteAddr, ".");
if (analysis(strArray, 1) || analysis(strArray, 2) || analysis(strArray, 3)) {
AclUtils.verify(remoteAddr, index - 1);
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
for (int j = 0; j < index; j++) {
sb.append(strArray[j].trim()).append(".");
}
......
......@@ -565,11 +565,11 @@ public class UtilAll {
if (list == null || list.size() == 0) {
return null;
}
StringBuffer str = new StringBuffer();
StringBuilder str = new StringBuilder();
for (int i = 0; i < list.size(); i++) {
str.append(list.get(i));
if (i == list.size() - 1) {
continue;
break;
}
str.append(splitor);
}
......
......@@ -43,11 +43,13 @@ public class Message implements Serializable {
this.flag = flag;
this.body = body;
if (tags != null && tags.length() > 0)
if (tags != null && tags.length() > 0) {
this.setTags(tags);
}
if (keys != null && keys.length() > 0)
if (keys != null && keys.length() > 0) {
this.setKeys(keys);
}
this.setWaitStoreMsgOK(waitStoreMsgOK);
}
......@@ -127,7 +129,7 @@ public class Message implements Serializable {
}
public void setKeys(Collection<String> keys) {
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
for (String k : keys) {
sb.append(k);
sb.append(MessageConst.KEY_SEPARATOR);
......@@ -151,8 +153,9 @@ public class Message implements Serializable {
public boolean isWaitStoreMsgOK() {
String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
if (null == result)
if (null == result) {
return true;
}
return Boolean.parseBoolean(result);
}
......
......@@ -120,7 +120,7 @@ public class NamespaceUtil {
return null;
}
return new StringBuffer()
return new StringBuilder()
.append(MixAll.RETRY_GROUP_TOPIC_PREFIX)
.append(wrapNamespace(namespace, consumerGroup))
.toString();
......
......@@ -19,11 +19,15 @@ package org.apache.rocketmq.common;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
import static org.junit.Assert.assertEquals;
public class UtilAllTest {
......@@ -109,6 +113,15 @@ public class UtilAllTest {
assertThat(UtilAll.ipToIPv6Str(nonInternal.getAddress()).toUpperCase()).isEqualTo("2408:4004:0180:8100:3FAA:1DDE:2B3F:898A");
}
@Test
public void testList2String() {
List<String> list = Arrays.asList("groupA=DENY", "groupB=PUB|SUB", "groupC=SUB");
String comma = ",";
assertEquals("groupA=DENY,groupB=PUB|SUB,groupC=SUB", UtilAll.list2String(list, comma));
assertEquals(null, UtilAll.list2String(null, comma));
assertEquals(null, UtilAll.list2String(Collections.emptyList(), comma));
}
static class DemoConfig {
private int demoWidth = 0;
private int demoLength = 0;
......
......@@ -82,7 +82,7 @@ public class IOTinyUtilsTest {
@Test
public void testReadLines() throws Exception {
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10; i++) {
sb.append("testReadLines").append("\n");
}
......@@ -95,7 +95,7 @@ public class IOTinyUtilsTest {
@Test
public void testToBufferedReader() throws Exception {
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10; i++) {
sb.append("testToBufferedReader").append("\n");
}
......
......@@ -120,13 +120,15 @@ public class AsyncProducer {
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
......
......@@ -17,18 +17,11 @@
package org.apache.rocketmq.example.benchmark;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
......@@ -42,6 +35,16 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class Consumer {
public static void main(String[] args) throws MQClientException, IOException {
......@@ -71,11 +74,12 @@ public class Consumer {
final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
final Timer timer = new Timer("BenchmarkTimerThread", true);
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
timer.scheduleAtFixedRate(new TimerTask() {
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
......@@ -83,9 +87,9 @@ public class Consumer {
snapshotList.removeFirst();
}
}
}, 1000, 1000);
}, 1000, 1000, TimeUnit.MILLISECONDS);
timer.scheduleAtFixedRate(new TimerTask() {
executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats() {
if (snapshotList.size() >= 10) {
Long[] begin = snapshotList.getFirst();
......@@ -116,7 +120,7 @@ public class Consumer {
e.printStackTrace();
}
}
}, 10000, 10000);
}, 10000, 10000, TimeUnit.MILLISECONDS);
RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null);
......
......@@ -16,32 +16,34 @@
*/
package org.apache.rocketmq.example.benchmark;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Random;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
public class Producer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
......@@ -73,7 +75,8 @@ public class Producer {
final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer();
final Timer timer = new Timer("BenchmarkTimerThread", true);
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
......@@ -87,7 +90,7 @@ public class Producer {
}
}
timer.scheduleAtFixedRate(new TimerTask() {
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmark.createSnapshot());
......@@ -95,9 +98,9 @@ public class Producer {
snapshotList.removeFirst();
}
}
}, 1000, 1000);
}, 1000, 1000, TimeUnit.MILLISECONDS);
timer.scheduleAtFixedRate(new TimerTask() {
executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats() {
if (snapshotList.size() >= 10) {
doPrintStats(snapshotList, statsBenchmark, false);
......@@ -112,7 +115,7 @@ public class Producer {
e.printStackTrace();
}
}
}, 10000, 10000);
}, 10000, 10000, TimeUnit.MILLISECONDS);
RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null);
......@@ -224,7 +227,12 @@ public class Producer {
try {
sendThreadPool.shutdown();
sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
timer.cancel();
executorService.shutdown();
try {
executorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (snapshotList.size() > 1) {
doPrintStats(snapshotList, statsBenchmark, true);
} else {
......
......@@ -17,26 +17,11 @@
package org.apache.rocketmq.example.benchmark;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
......@@ -48,6 +33,24 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.srvutil.ServerUtil;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TransactionProducer {
private static final long START_TIME = System.currentTimeMillis();
private static final AtomicLong MSG_COUNT = new AtomicLong(0);
......@@ -75,11 +78,12 @@ public class TransactionProducer {
final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer();
final Timer timer = new Timer("BenchmarkTimerThread", true);
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
final LinkedList<Snapshot> snapshotList = new LinkedList<>();
timer.scheduleAtFixedRate(new TimerTask() {
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmark.createSnapshot());
......@@ -87,9 +91,9 @@ public class TransactionProducer {
snapshotList.removeFirst();
}
}
}, 1000, 1000);
}, 1000, 1000, TimeUnit.MILLISECONDS);
timer.scheduleAtFixedRate(new TimerTask() {
executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats() {
if (snapshotList.size() >= 10) {
Snapshot begin = snapshotList.getFirst();
......@@ -121,7 +125,7 @@ public class TransactionProducer {
e.printStackTrace();
}
}
}, 10000, 10000);
}, 10000, 10000, TimeUnit.MILLISECONDS);
final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config);
final TransactionMQProducer producer = new TransactionMQProducer(
......
......@@ -53,6 +53,7 @@ public abstract class UnaryExpression implements Expression {
public static Expression createNegate(Expression left) {
return new UnaryExpression(left, UnaryType.NEGATE) {
@Override
public Object evaluate(EvaluationContext context) throws Exception {
Object rvalue = right.evaluate(context);
if (rvalue == null) {
......@@ -64,6 +65,7 @@ public abstract class UnaryExpression implements Expression {
return null;
}
@Override
public String getExpressionSymbol() {
return "-";
}
......@@ -85,6 +87,7 @@ public abstract class UnaryExpression implements Expression {
final Collection inList = t;
return new UnaryInExpression(right, UnaryType.IN, inList, not) {
@Override
public Object evaluate(EvaluationContext context) throws Exception {
Object rvalue = right.evaluate(context);
......@@ -103,8 +106,9 @@ public abstract class UnaryExpression implements Expression {
}
@Override
public String toString() {
StringBuffer answer = new StringBuffer();
StringBuilder answer = new StringBuilder();
answer.append(right);
answer.append(" ");
answer.append(getExpressionSymbol());
......@@ -124,6 +128,7 @@ public abstract class UnaryExpression implements Expression {
return answer.toString();
}
@Override
public String getExpressionSymbol() {
if (not) {
return "NOT IN";
......@@ -139,6 +144,7 @@ public abstract class UnaryExpression implements Expression {
super(left, unaryType);
}
@Override
public boolean matches(EvaluationContext context) throws Exception {
Object object = evaluate(context);
return object != null && object == Boolean.TRUE;
......@@ -147,6 +153,7 @@ public abstract class UnaryExpression implements Expression {
public static BooleanExpression createNOT(BooleanExpression left) {
return new BooleanUnaryExpression(left, UnaryType.NOT) {
@Override
public Object evaluate(EvaluationContext context) throws Exception {
Boolean lvalue = (Boolean) right.evaluate(context);
if (lvalue == null) {
......@@ -155,6 +162,7 @@ public abstract class UnaryExpression implements Expression {
return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE;
}
@Override
public String getExpressionSymbol() {
return "NOT";
}
......@@ -163,6 +171,7 @@ public abstract class UnaryExpression implements Expression {
public static BooleanExpression createBooleanCast(Expression left) {
return new BooleanUnaryExpression(left, UnaryType.BOOLEANCAST) {
@Override
public Object evaluate(EvaluationContext context) throws Exception {
Object rvalue = right.evaluate(context);
if (rvalue == null) {
......@@ -174,10 +183,12 @@ public abstract class UnaryExpression implements Expression {
return ((Boolean) rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE;
}
@Override
public String toString() {
return right.toString();
}
@Override
public String getExpressionSymbol() {
return "";
}
......@@ -233,6 +244,7 @@ public abstract class UnaryExpression implements Expression {
/**
* @see Object#toString()
*/
@Override
public String toString() {
return "(" + getExpressionSymbol() + " " + right.toString() + ")";
}
......@@ -240,6 +252,7 @@ public abstract class UnaryExpression implements Expression {
/**
* @see Object#hashCode()
*/
@Override
public int hashCode() {
return toString().hashCode();
}
......@@ -247,6 +260,7 @@ public abstract class UnaryExpression implements Expression {
/**
* @see Object#equals(Object)
*/
@Override
public boolean equals(Object o) {
if (o == null || !this.getClass().equals(o.getClass())) {
......
......@@ -106,7 +106,7 @@ public class ParseException extends Exception {
int[][] expectedTokenSequences,
String[] tokenImage) {
String eol = System.getProperty("line.separator", "\n");
StringBuffer expected = new StringBuffer();
StringBuilder expected = new StringBuilder();
int maxSize = 0;
for (int i = 0; i < expectedTokenSequences.length; i++) {
if (maxSize < expectedTokenSequences[i].length) {
......@@ -123,8 +123,9 @@ public class ParseException extends Exception {
String retval = "Encountered \"";
Token tok = currentToken.next;
for (int i = 0; i < maxSize; i++) {
if (i != 0)
if (i != 0) {
retval += " ";
}
if (tok.kind == 0) {
retval += tokenImage[0];
break;
......@@ -157,7 +158,7 @@ public class ParseException extends Exception {
* string literal.
*/
static String add_escapes(String str) {
StringBuffer retval = new StringBuffer();
StringBuilder retval = new StringBuilder();
char ch;
for (int i = 0; i < str.length(); i++) {
switch (str.charAt(i)) {
......
......@@ -66,7 +66,7 @@ public class TokenMgrError extends Error {
* equivalents in the given string
*/
protected static final String addEscapes(String str) {
StringBuffer retval = new StringBuffer();
StringBuilder retval = new StringBuilder();
char ch;
for (int i = 0; i < str.length(); i++) {
switch (str.charAt(i)) {
......@@ -141,6 +141,7 @@ public class TokenMgrError extends Error {
* <p/>
* from this method for such cases in the release version of your parser.
*/
@Override
public String getMessage() {
return super.getMessage();
}
......
......@@ -84,7 +84,7 @@ public class ParserTest {
@Test
public void testParse_floatOverFlow() {
try {
StringBuffer sb = new StringBuffer(210000);
StringBuilder sb = new StringBuilder(210000);
sb.append("1");
for (int i = 0; i < 2048; i ++) {
sb.append("111111111111111111111111111111111111111111111111111");
......
......@@ -36,7 +36,7 @@ public class RemotingHelper {
private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_REMOTING);
public static String exceptionSimpleDesc(final Throwable e) {
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
if (e != null) {
sb.append(e.toString());
......
......@@ -159,7 +159,7 @@ public class BrokerStatsManager {
}
public String buildStatsKey(String topic, String group) {
StringBuffer strBuilder = new StringBuffer();
StringBuilder strBuilder = new StringBuilder();
strBuilder.append(topic);
strBuilder.append("@");
strBuilder.append(group);
......@@ -217,7 +217,7 @@ public class BrokerStatsManager {
}
public String buildCommercialStatsKey(String owner, String topic, String group, String type) {
StringBuffer strBuilder = new StringBuffer();
StringBuilder strBuilder = new StringBuilder();
strBuilder.append(owner);
strBuilder.append("@");
strBuilder.append(topic);
......
......@@ -114,7 +114,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
private static String createBodyFile(MessageExt msg, int index) throws IOException {
DataOutputStream dos = null;
try {
StringBuffer bodyTmpFilePath = new StringBuffer("/tmp/rocketmq/msgbodys");
StringBuilder bodyTmpFilePath = new StringBuilder("/tmp/rocketmq/msgbodys");
File file = new File(bodyTmpFilePath.toString());
if (!file.exists()) {
file.mkdirs();
......@@ -127,8 +127,9 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
dos.write(msg.getBody());
return bodyTmpFilePath.toString();
} finally {
if (dos != null)
if (dos != null) {
dos.close();
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册