提交 47d88147 编写于 作者: D duhenglucky 提交者: ShannonDing

(1) Polish message trace target channel (2) Fix the issue that consume message...

(1) Polish message trace target channel (2) Fix the issue that consume message with namespace trace cannot found
上级 08b62fca
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client;
public enum AccessChannel {
local,
cloud,
}
...@@ -37,6 +37,8 @@ public class ClientConfig { ...@@ -37,6 +37,8 @@ public class ClientConfig {
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
protected String namespace; protected String namespace;
protected AccessChannel accessChannel = AccessChannel.local;
/** /**
* Pulling topic information interval from the named server * Pulling topic information interval from the named server
*/ */
...@@ -263,6 +265,14 @@ public class ClientConfig { ...@@ -263,6 +265,14 @@ public class ClientConfig {
this.namespace = namespace; this.namespace = namespace;
} }
public AccessChannel getAccessChannel() {
return this.accessChannel;
}
public void setAccessChannel(AccessChannel accessChannel) {
this.accessChannel = accessChannel;
}
@Override @Override
public String toString() { public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
......
...@@ -693,7 +693,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -693,7 +693,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.defaultMQPushConsumerImpl.start(); this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) { if (null != traceDispatcher) {
try { try {
traceDispatcher.start(this.getNamesrvAddr()); traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) { } catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e); log.warn("trace dispatcher start failed ", e);
} }
......
...@@ -238,15 +238,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -238,15 +238,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
} }
/** /**
* Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic name. * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
* name.
* *
* @param namespace Namespace for this MQ Producer instance. * @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field. * @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution. * @param rpcHook RPC hook to execute per each remoting command execution.
* @param enableMsgTrace Switch flag instance for message trace. * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/ */
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace; this.namespace = namespace;
this.producerGroup = producerGroup; this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
...@@ -282,7 +285,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -282,7 +285,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.start(); this.defaultMQProducerImpl.start();
if (null != traceDispatcher) { if (null != traceDispatcher) {
try { try {
traceDispatcher.start(this.getNamesrvAddr()); traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) { } catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e); log.warn("trace dispatcher start failed ", e);
} }
...@@ -331,7 +334,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -331,7 +334,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override @Override
public SendResult send( public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Validators.checkMessage(msg,this); Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic())); msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg); return this.defaultMQProducerImpl.send(msg);
} }
......
...@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; ...@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
...@@ -57,7 +58,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -57,7 +58,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private final int batchSize; private final int batchSize;
private final int maxMsgSize; private final int maxMsgSize;
private final DefaultMQProducer traceProducer; private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecuter; private final ThreadPoolExecutor traceExecutor;
// The last discard number of log // The last discard number of log
private AtomicLong discardCount; private AtomicLong discardCount;
private Thread worker; private Thread worker;
...@@ -71,8 +72,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -71,8 +72,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private String dispatcherId = UUID.randomUUID().toString(); private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName; private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false); private AtomicBoolean isStarted = new AtomicBoolean(false);
private AccessChannel accessChannel = AccessChannel.local;
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value // queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048; this.queueSize = 2048;
this.batchSize = 100; this.batchSize = 100;
...@@ -85,7 +87,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -85,7 +87,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
} else { } else {
this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
} }
this.traceExecuter = new ThreadPoolExecutor(// this.traceExecutor = new ThreadPoolExecutor(//
10, // 10, //
20, // 20, //
1000 * 60, // 1000 * 60, //
...@@ -95,6 +97,14 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -95,6 +97,14 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
traceProducer = getAndCreateTraceProducer(rpcHook); traceProducer = getAndCreateTraceProducer(rpcHook);
} }
public AccessChannel getAccessChannel() {
return accessChannel;
}
public void setAccessChannel(AccessChannel accessChannel) {
this.accessChannel = accessChannel;
}
public String getTraceTopicName() { public String getTraceTopicName() {
return traceTopicName; return traceTopicName;
} }
...@@ -123,12 +133,13 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -123,12 +133,13 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.hostConsumer = hostConsumer; this.hostConsumer = hostConsumer;
} }
public void start(String nameSrvAddr) throws MQClientException { public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) { if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start(); traceProducer.start();
} }
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true); this.worker.setDaemon(true);
this.worker.start(); this.worker.start();
...@@ -174,7 +185,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -174,7 +185,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
@Override @Override
public void shutdown() { public void shutdown() {
this.stopped = true; this.stopped = true;
this.traceExecuter.shutdown(); this.traceExecutor.shutdown();
if (isStarted.get()) { if (isStarted.get()) {
traceProducer.shutdown(); traceProducer.shutdown();
} }
...@@ -231,7 +242,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -231,7 +242,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
} }
if (contexts.size() > 0) { if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecuter.submit(request); traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) { } else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true; this.stopped = true;
} }
...@@ -330,11 +341,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -330,11 +341,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
*/ */
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) { private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
String traceTopic = traceTopicName; String traceTopic = traceTopicName;
if (StringUtils.isNotEmpty(regionId) && MixAll.RMQ_SYS_TRACE_TOPIC.equals(traceTopic)) { if (AccessChannel.cloud == accessChannel){
traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId; traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
} }
final Message message = new Message(traceTopic, data.getBytes()); final Message message = new Message(traceTopic, data.getBytes());
// Keyset of message trace includes msgId of or original message // Keyset of message trace includes msgId of or original message
message.setKeys(keySet); message.setKeys(keySet);
try { try {
...@@ -342,6 +352,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -342,6 +352,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
SendCallback callback = new SendCallback() { SendCallback callback = new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
} }
@Override @Override
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import java.io.IOException; import java.io.IOException;
...@@ -27,7 +28,7 @@ public interface TraceDispatcher { ...@@ -27,7 +28,7 @@ public interface TraceDispatcher {
/** /**
* Initialize asynchronous transfer data module * Initialize asynchronous transfer data module
*/ */
void start(String nameSrvAddr) throws MQClientException; void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
/** /**
* Append the transfering data * Append the transfering data
......
...@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
...@@ -51,7 +52,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -51,7 +52,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceContext traceContext = new TraceContext(); TraceContext traceContext = new TraceContext();
context.setMqTraceContext(traceContext); context.setMqTraceContext(traceContext);
traceContext.setTraceType(TraceType.SubBefore);// traceContext.setTraceType(TraceType.SubBefore);//
traceContext.setGroupName(context.getConsumerGroup());// traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//
List<TraceBean> beans = new ArrayList<TraceBean>(); List<TraceBean> beans = new ArrayList<TraceBean>();
for (MessageExt msg : context.getMsgList()) { for (MessageExt msg : context.getMsgList()) {
if (msg == null) { if (msg == null) {
...@@ -65,7 +66,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -65,7 +66,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
continue; continue;
} }
TraceBean traceBean = new TraceBean(); TraceBean traceBean = new TraceBean();
traceBean.setTopic(msg.getTopic());// traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//
traceBean.setMsgId(msg.getMsgId());// traceBean.setMsgId(msg.getMsgId());//
traceBean.setTags(msg.getTags());// traceBean.setTags(msg.getTags());//
traceBean.setKeys(msg.getKeys());// traceBean.setKeys(msg.getKeys());//
...@@ -96,7 +97,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -96,7 +97,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceContext subAfterContext = new TraceContext(); TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TraceType.SubAfter);// subAfterContext.setTraceType(TraceType.SubAfter);//
subAfterContext.setRegionId(subBeforeContext.getRegionId());// subAfterContext.setRegionId(subBeforeContext.getRegionId());//
subAfterContext.setGroupName(subBeforeContext.getGroupName());// subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
subAfterContext.setRequestId(subBeforeContext.getRequestId());// subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());// subAfterContext.setSuccess(context.isSuccess());//
......
...@@ -16,15 +16,16 @@ ...@@ -16,15 +16,16 @@
*/ */
package org.apache.rocketmq.client.trace.hook; package org.apache.rocketmq.client.trace.hook;
import java.util.ArrayList;
import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceType; import org.apache.rocketmq.client.trace.TraceType;
import java.util.ArrayList; import org.apache.rocketmq.common.protocol.NamespaceUtil;
public class SendMessageTraceHookImpl implements SendMessageHook { public class SendMessageTraceHookImpl implements SendMessageHook {
...@@ -50,10 +51,10 @@ public class SendMessageTraceHookImpl implements SendMessageHook { ...@@ -50,10 +51,10 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1)); tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext); context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(context.getProducerGroup()); tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
//build the data bean object of message trace //build the data bean object of message trace
TraceBean traceBean = new TraceBean(); TraceBean traceBean = new TraceBean();
traceBean.setTopic(context.getMessage().getTopic()); traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
traceBean.setTags(context.getMessage().getTags()); traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys()); traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr()); traceBean.setStoreHost(context.getBrokerAddr());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册