提交 691b7121 编写于 作者: H Hu Zongtang 提交者: Zhendong Liu

[ISSUE#662]capitalizing the first letter in comments and removing the unless...

[ISSUE#662]capitalizing the first letter in comments and removing the unless comments for acl and msg trace feature codes. (#669)

* [ISSUE#662]capitalizing the first letter in comments and removing the  unless comments for acl and msg trace feature codes.

* [ISSUE#662]polish the acl feature codes.
上级 ea8d5179
...@@ -24,7 +24,8 @@ public interface AccessValidator { ...@@ -24,7 +24,8 @@ public interface AccessValidator {
* Parse to get the AccessResource(user, resource, needed permission) * Parse to get the AccessResource(user, resource, needed permission)
* *
* @param request * @param request
* @return * @param remoteAddr
* @return Plain access resource result,include access key,signature and some other access attributes.
*/ */
AccessResource parse(RemotingCommand request, String remoteAddr); AccessResource parse(RemotingCommand request, String remoteAddr);
......
...@@ -44,7 +44,8 @@ public class AclClientRPCHook implements RPCHook { ...@@ -44,7 +44,8 @@ public class AclClientRPCHook implements RPCHook {
String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
request.addExtField(SIGNATURE, signature); request.addExtField(SIGNATURE, signature);
request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey()); request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
// The SecurityToken value is unneccessary,user can choose this one.
if (sessionCredentials.getSecurityToken() != null) { if (sessionCredentials.getSecurityToken() != null) {
request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken()); request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
} }
...@@ -57,14 +58,14 @@ public class AclClientRPCHook implements RPCHook { ...@@ -57,14 +58,14 @@ public class AclClientRPCHook implements RPCHook {
protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String securityToken) { protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String securityToken) {
CommandCustomHeader header = request.readCustomHeader(); CommandCustomHeader header = request.readCustomHeader();
// sort property // Sort property
SortedMap<String, String> map = new TreeMap<String, String>(); SortedMap<String, String> map = new TreeMap<String, String>();
map.put(ACCESS_KEY, ak); map.put(ACCESS_KEY, ak);
if (securityToken != null) { if (securityToken != null) {
map.put(SECURITY_TOKEN, securityToken); map.put(SECURITY_TOKEN, securityToken);
} }
try { try {
// add header properties // Add header properties
if (null != header) { if (null != header) {
Field[] fields = fieldCache.get(header.getClass()); Field[] fields = fieldCache.get(header.getClass());
if (null == fields) { if (null == fields) {
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.rocketmq.acl.common; package org.apache.rocketmq.acl.common;
public class AclException extends RuntimeException { public class AclException extends RuntimeException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = -7256002576788700354L;
private String status; private String status;
private int code; private int code;
...@@ -34,12 +34,6 @@ public class AclException extends RuntimeException { ...@@ -34,12 +34,6 @@ public class AclException extends RuntimeException {
this.code = code; this.code = code;
} }
public AclException(String status, int code, Throwable throwable) {
super(throwable);
this.status = status;
this.code = code;
}
public AclException(String message) { public AclException(String message) {
super(message); super(message);
} }
......
...@@ -33,11 +33,11 @@ public class Permission { ...@@ -33,11 +33,11 @@ public class Permission {
public static final Set<Integer> ADMIN_CODE = new HashSet<Integer>(); public static final Set<Integer> ADMIN_CODE = new HashSet<Integer>();
static { static {
// UPDATE_AND_CREATE_TOPIC // UPDATE_AND_CREATE_TOPIC
ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_TOPIC); ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_TOPIC);
// UPDATE_BROKER_CONFIG // UPDATE_BROKER_CONFIG
ADMIN_CODE.add(RequestCode.UPDATE_BROKER_CONFIG); ADMIN_CODE.add(RequestCode.UPDATE_BROKER_CONFIG);
// DELETE_TOPIC_IN_BROKER // DELETE_TOPIC_IN_BROKER
ADMIN_CODE.add(RequestCode.DELETE_TOPIC_IN_BROKER); ADMIN_CODE.add(RequestCode.DELETE_TOPIC_IN_BROKER);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP // UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP); ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP);
......
...@@ -24,7 +24,7 @@ import org.apache.rocketmq.common.MixAll; ...@@ -24,7 +24,7 @@ import org.apache.rocketmq.common.MixAll;
public class PlainAccessResource implements AccessResource { public class PlainAccessResource implements AccessResource {
//identify the user // Identify the user
private String accessKey; private String accessKey;
private String secretKey; private String secretKey;
......
...@@ -111,7 +111,7 @@ public class PlainAccessValidator implements AccessValidator { ...@@ -111,7 +111,7 @@ public class PlainAccessValidator implements AccessValidator {
} catch (Throwable t) { } catch (Throwable t) {
throw new AclException(t.getMessage(), t); throw new AclException(t.getMessage(), t);
} }
// content // Content
SortedMap<String, String> map = new TreeMap<String, String>(); SortedMap<String, String> map = new TreeMap<String, String>();
for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) { for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) { if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
......
...@@ -119,7 +119,7 @@ public class PlainPermissionLoader { ...@@ -119,7 +119,7 @@ public class PlainPermissionLoader {
Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap(); Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap();
if (needCheckedPermMap == null) { if (needCheckedPermMap == null) {
//if the needCheckedPermMap is null,then return // If the needCheckedPermMap is null,then return
return; return;
} }
...@@ -129,7 +129,7 @@ public class PlainPermissionLoader { ...@@ -129,7 +129,7 @@ public class PlainPermissionLoader {
boolean isGroup = PlainAccessResource.isRetryTopic(resource); boolean isGroup = PlainAccessResource.isRetryTopic(resource);
if (!ownedPermMap.containsKey(resource)) { if (!ownedPermMap.containsKey(resource)) {
//Check the default perm // Check the default perm
byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() : byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :
needCheckedAccess.getDefaultTopicPerm(); needCheckedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) { if (!Permission.checkPermission(neededPerm, ownedPerm)) {
...@@ -178,7 +178,7 @@ public class PlainPermissionLoader { ...@@ -178,7 +178,7 @@ public class PlainPermissionLoader {
public void validate(PlainAccessResource plainAccessResource) { public void validate(PlainAccessResource plainAccessResource) {
//Step 1, check the global white remote addr // Check the global white remote addr
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) { for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
if (remoteAddressStrategy.match(plainAccessResource)) { if (remoteAddressStrategy.match(plainAccessResource)) {
return; return;
...@@ -193,18 +193,18 @@ public class PlainPermissionLoader { ...@@ -193,18 +193,18 @@ public class PlainPermissionLoader {
throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey())); throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));
} }
//Step 2, check the white addr for accesskey // Check the white addr for accesskey
PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey()); PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) { if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
return; return;
} }
//Step 3, check the signature // Check the signature
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
if (!signature.equals(plainAccessResource.getSignature())) { if (!signature.equals(plainAccessResource.getSignature())) {
throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
} }
//Step 4, check perm of each resource // Check perm of each resource
checkPerm(plainAccessResource, ownedAccess); checkPerm(plainAccessResource, ownedAccess);
} }
......
...@@ -18,5 +18,5 @@ package org.apache.rocketmq.acl.plain; ...@@ -18,5 +18,5 @@ package org.apache.rocketmq.acl.plain;
public interface RemoteAddressStrategy { public interface RemoteAddressStrategy {
public boolean match(PlainAccessResource plainAccessResource); boolean match(PlainAccessResource plainAccessResource);
} }
...@@ -270,7 +270,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -270,7 +270,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* *
* @param consumerGroup Consume queue. * @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command. * @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm. * @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/ */
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) { AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
...@@ -280,13 +280,13 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -280,13 +280,13 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
} }
/** /**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm. * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
* *
* @param consumerGroup Consume queue. * @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command. * @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm. * @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace switch flag instance for message trace. * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic the name value of message trace topic.If you don't config,you can use the default trace topic name. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
...@@ -317,21 +317,21 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -317,21 +317,21 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/** /**
* Constructor specifying consumer group. * Constructor specifying consumer group and enabled msg trace flag.
* *
* @param consumerGroup Consumer group. * @param consumerGroup Consumer group.
* @param enableMsgTrace switch flag instance for message trace. * @param enableMsgTrace Switch flag instance for message trace.
*/ */
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) { public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null); this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
} }
/** /**
* Constructor specifying consumer group. * Constructor specifying consumer group, enabled msg trace flag and customized trace topic name.
* *
* @param consumerGroup Consumer group. * @param consumerGroup Consumer group.
* @param enableMsgTrace switch flag instance for message trace. * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic the name value of message trace topic.If you don't config,you can use the default trace topic name. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic); this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
......
...@@ -150,12 +150,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -150,12 +150,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
} }
/** /**
* Constructor specifying both producer group and RPC hook. * Constructor specifying producer group, RPC hook, enabled msgTrace flag and customized trace topic name.
* *
* @param producerGroup Producer group, see the name-sake field. * @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution. * @param rpcHook RPC hook to execute per each remoting command execution.
* @param enableMsgTrace switch flag instance for message trace. * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic the name value of message trace topic.If you don't config,you can use the default trace topic name. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {
this.producerGroup = producerGroup; this.producerGroup = producerGroup;
...@@ -184,10 +184,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -184,10 +184,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
} }
/** /**
* Constructor specifying producer group. * Constructor specifying producer group and enabled msg trace flag.
* *
* @param producerGroup Producer group, see the name-sake field. * @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace switch flag instance for message trace. * @param enableMsgTrace Switch flag instance for message trace.
*/ */
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) { public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {
this(producerGroup, null, enableMsgTrace, null); this(producerGroup, null, enableMsgTrace, null);
...@@ -195,11 +195,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -195,11 +195,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/** /**
* Constructor specifying producer group. * Constructor specifying producer group, enabled msgTrace flag and customized trace topic name.
* *
* @param producerGroup Producer group, see the name-sake field. * @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace switch flag instance for message trace. * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic the name value of message trace topic.If you don't config,you can use the default trace topic name. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(producerGroup, null, enableMsgTrace, customizedTraceTopic); this(producerGroup, null, enableMsgTrace, customizedTraceTopic);
......
...@@ -58,7 +58,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -58,7 +58,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private final int maxMsgSize; private final int maxMsgSize;
private final DefaultMQProducer traceProducer; private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecuter; private final ThreadPoolExecutor traceExecuter;
// the last discard number of log // The last discard number of log
private AtomicLong discardCount; private AtomicLong discardCount;
private Thread worker; private Thread worker;
private ArrayBlockingQueue<TraceContext> traceContextQueue; private ArrayBlockingQueue<TraceContext> traceContextQueue;
...@@ -143,7 +143,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -143,7 +143,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
traceProducerInstance.setSendMsgTimeout(5000); traceProducerInstance.setSendMsgTimeout(5000);
traceProducerInstance.setInstanceName(TRACE_INSTANCE_NAME); traceProducerInstance.setInstanceName(TRACE_INSTANCE_NAME);
traceProducerInstance.setVipChannelEnabled(false); traceProducerInstance.setVipChannelEnabled(false);
//the max size of message is 128K // The max size of message is 128K
traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000); traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
} }
return traceProducerInstance; return traceProducerInstance;
...@@ -160,7 +160,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -160,7 +160,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
// the maximum waiting time for refresh,avoid being written all the time, resulting in failure to return. // The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
long end = System.currentTimeMillis() + 500; long end = System.currentTimeMillis() + 500;
while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) { while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
try { try {
...@@ -263,9 +263,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -263,9 +263,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
if (context.getTraceBeans().isEmpty()) { if (context.getTraceBeans().isEmpty()) {
continue; continue;
} }
//1.topic value corresponding to original message entity content // Topic value corresponding to original message entity content
String topic = context.getTraceBeans().get(0).getTopic(); String topic = context.getTraceBeans().get(0).getTopic();
//2.use original message entity's topic as key // Use original message entity's topic as key
String key = topic; String key = topic;
List<TraceTransferBean> transBeanList = transBeanMap.get(key); List<TraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) { if (transBeanList == null) {
...@@ -281,26 +281,26 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -281,26 +281,26 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
} }
/** /**
* batch sending data actually * Batch sending data actually
*/ */
private void flushData(List<TraceTransferBean> transBeanList) { private void flushData(List<TraceTransferBean> transBeanList) {
if (transBeanList.size() == 0) { if (transBeanList.size() == 0) {
return; return;
} }
// temporary buffer // Temporary buffer
StringBuilder buffer = new StringBuilder(1024); StringBuilder buffer = new StringBuilder(1024);
int count = 0; int count = 0;
Set<String> keySet = new HashSet<String>(); Set<String> keySet = new HashSet<String>();
for (TraceTransferBean bean : transBeanList) { for (TraceTransferBean bean : transBeanList) {
// keyset of message trace includes msgId of or original message // Keyset of message trace includes msgId of or original message
keySet.addAll(bean.getTransKey()); keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData()); buffer.append(bean.getTransData());
count++; count++;
// Ensure that the size of the package should not exceed the upper limit. // Ensure that the size of the package should not exceed the upper limit.
if (buffer.length() >= traceProducer.getMaxMessageSize()) { if (buffer.length() >= traceProducer.getMaxMessageSize()) {
sendTraceDataByMQ(keySet, buffer.toString()); sendTraceDataByMQ(keySet, buffer.toString());
// clear temporary buffer after finishing // Clear temporary buffer after finishing
buffer.delete(0, buffer.length()); buffer.delete(0, buffer.length());
keySet.clear(); keySet.clear();
count = 0; count = 0;
...@@ -313,7 +313,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -313,7 +313,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
} }
/** /**
* send message trace data * Send message trace data
* *
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message trace data in this batch * @param data the message trace data in this batch
...@@ -322,7 +322,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -322,7 +322,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
String topic = traceTopicName; String topic = traceTopicName;
final Message message = new Message(topic, data.getBytes()); final Message message = new Message(topic, data.getBytes());
//keyset of message trace includes msgId of or original message // Keyset of message trace includes msgId of or original message
message.setKeys(keySet); message.setKeys(keySet);
try { try {
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic); Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
...@@ -337,7 +337,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -337,7 +337,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
} }
}; };
if (traceBrokerSet.isEmpty()) { if (traceBrokerSet.isEmpty()) {
//no cross set // No cross set
traceProducer.send(message, callback, 5000); traceProducer.send(message, callback, 5000);
} else { } else {
traceProducer.send(message, new MessageQueueSelector() { traceProducer.send(message, new MessageQueueSelector() {
......
...@@ -22,12 +22,12 @@ import java.util.ArrayList; ...@@ -22,12 +22,12 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* encode/decode for Trace Data * Encode/decode for Trace Data
*/ */
public class TraceDataEncoder { public class TraceDataEncoder {
/** /**
* resolving traceContext list From trace data String * Resolving traceContext list From trace data String
* *
* @param traceData * @param traceData
* @return * @return
......
...@@ -30,21 +30,21 @@ public interface TraceDispatcher { ...@@ -30,21 +30,21 @@ public interface TraceDispatcher {
void start(String nameSrvAddr) throws MQClientException; void start(String nameSrvAddr) throws MQClientException;
/** /**
* append the transfering data * Append the transfering data
* @param ctx data infomation * @param ctx data infomation
* @return * @return
*/ */
boolean append(Object ctx); boolean append(Object ctx);
/** /**
* write flush action * Write flush action
* *
* @throws IOException * @throws IOException
*/ */
void flush() throws IOException; void flush() throws IOException;
/** /**
* close the trace Hook * Close the trace Hook
*/ */
void shutdown(); void shutdown();
} }
...@@ -20,7 +20,7 @@ import java.util.HashSet; ...@@ -20,7 +20,7 @@ import java.util.HashSet;
import java.util.Set; import java.util.Set;
/** /**
* trace transfering bean * Trace transfering bean
*/ */
public class TraceTransferBean { public class TraceTransferBean {
private String transData; private String transData;
......
...@@ -61,7 +61,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -61,7 +61,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH); String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
if (traceOn != null && traceOn.equals("false")) { if (traceOn != null && traceOn.equals("false")) {
// if trace switch is false ,skip it // If trace switch is false ,skip it
continue; continue;
} }
TraceBean traceBean = new TraceBean(); TraceBean traceBean = new TraceBean();
...@@ -90,7 +90,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -90,7 +90,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext(); TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) { if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// if subbefore bean is null ,skip it // If subbefore bean is null ,skip it
return; return;
} }
TraceContext subAfterContext = new TraceContext(); TraceContext subAfterContext = new TraceContext();
...@@ -100,7 +100,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -100,7 +100,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
subAfterContext.setRequestId(subBeforeContext.getRequestId());// subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());// subAfterContext.setSuccess(context.isSuccess());//
//caculate the cost time for processing messages // Caculate the cost time for processing messages
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size()); int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);// subAfterContext.setCostTime(costTime);//
subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans()); subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
......
...@@ -84,7 +84,7 @@ public class AclClient { ...@@ -84,7 +84,7 @@ public class AclClient {
consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*"); consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800 // Wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20180422221800"); consumer.setConsumeTimestamp("20180422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerConcurrently() {
......
...@@ -28,11 +28,11 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -28,11 +28,11 @@ import org.apache.rocketmq.common.message.MessageExt;
public class TracePushConsumer { public class TracePushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException { public static void main(String[] args) throws InterruptedException, MQClientException {
//here,we use the default message track trace topic name // Here,we use the default message track trace topic name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
consumer.subscribe("TopicTest", "*"); consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800 // Wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800"); consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerConcurrently() {
......
...@@ -216,9 +216,6 @@ ...@@ -216,9 +216,6 @@
<execution> <execution>
<id>generate-effective-dependencies-pom</id> <id>generate-effective-dependencies-pom</id>
<phase>generate-resources</phase> <phase>generate-resources</phase>
<!-- <goals>
<goal>effective-pom</goal>
</goals> -->
<configuration> <configuration>
<output>${project.build.directory}/effective-pom/effective-dependencies.xml</output> <output>${project.build.directory}/effective-pom/effective-dependencies.xml</output>
</configuration> </configuration>
......
...@@ -256,7 +256,7 @@ public class MQAdminStartup { ...@@ -256,7 +256,7 @@ public class MQAdminStartup {
System.out.printf(" Cannot find conf file %s, acl is not be enabled.%n" ,fileHome + fileName); System.out.printf(" Cannot find conf file %s, acl is not be enabled.%n" ,fileHome + fileName);
return null; return null;
} }
// admin ak sk
String accessKey = yamlDataObject.getString("accessKey"); String accessKey = yamlDataObject.getString("accessKey");
String secretKey = yamlDataObject.getString("secretKey"); String secretKey = yamlDataObject.getString("secretKey");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册