提交 ce553b71 编写于 作者: R Rajan 提交者: Matteo Merli

Pass client library version to broker and show on stats (#387)

上级 c4d2e8bf
......@@ -110,6 +110,7 @@ public class Consumer {
stats.address = cnx.clientAddress().toString();
stats.consumerName = consumerName;
stats.connectedSince = DATE_FORMAT.format(Instant.now());
stats.clientVersion = cnx.getClientVersion();
if (subType == SubType.Shared) {
this.pendingAcks = new ConcurrentOpenHashMap<PositionImpl, Integer>(256, 2);
......
......@@ -75,6 +75,7 @@ public class Producer {
this.stats = new PublisherStats();
stats.address = cnx.clientAddress().toString();
stats.connectedSince = DATE_FORMAT.format(Instant.now());
stats.clientVersion = cnx.getClientVersion();
stats.producerName = producerName;
stats.producerId = producerId;
......
......@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -82,6 +83,7 @@ public class ServerCnx extends PulsarHandler {
private static final int ResumeReadsThreshold = MaxPendingSendRequests / 2;
private int pendingSendRequest = 0;
private final String replicatorPrefix;
private String clientVersion = null;
enum State {
Start, Connected
......@@ -314,6 +316,10 @@ public class ServerCnx extends PulsarHandler {
ctx.writeAndFlush(Commands.newConnected(connect));
state = State.Connected;
remoteEndpointProtocolVersion = connect.getProtocolVersion();
String version = connect.hasClientVersion() ? connect.getClientVersion() : null;
if (isNotBlank(version) && !version.contains(" ") /* ignore default version: pulsar client */) {
this.clientVersion = version;
}
}
@Override
......@@ -858,4 +864,8 @@ public class ServerCnx extends PulsarHandler {
public boolean isBatchMessageCompatibleVersion() {
return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber();
}
public String getClientVersion() {
return clientVersion;
}
}
......@@ -903,6 +903,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
destStatsStream.writePair("blockedConsumerOnUnackedMsgs",
consumerStats.blockedConsumerOnUnackedMsgs);
}
if (consumerStats.clientVersion != null) {
destStatsStream.writePair("clientVersion", consumerStats.clientVersion);
}
destStatsStream.endObject();
}
......
......@@ -163,6 +163,7 @@ public class BrokerServiceTest extends BrokerTestBase {
assertTrue(stats.publishers.get(0).msgRateIn > 0.0);
assertTrue(stats.publishers.get(0).msgThroughputIn > 0.0);
assertTrue(stats.publishers.get(0).averageMsgSize > 0.0);
assertNotNull(stats.publishers.get(0).clientVersion);
// aggregated publish stats
assertEquals(stats.msgRateIn, stats.publishers.get(0).msgRateIn);
......@@ -179,6 +180,7 @@ public class BrokerServiceTest extends BrokerTestBase {
assertEquals(subStats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertNotNull(subStats.consumers.get(0).clientVersion);
Message msg;
for (int i = 0; i < 10; i++) {
......
......@@ -177,7 +177,7 @@ public class ServerCnxTest {
assertEquals(serverCnx.getState(), State.Start);
// test server response to CONNECT
ByteBuf clientCommand = Commands.newConnect("none", "");
ByteBuf clientCommand = Commands.newConnect("none", "", null);
channel.writeInbound(clientCommand);
assertEquals(serverCnx.getState(), State.Connected);
......@@ -207,7 +207,7 @@ public class ServerCnxTest {
assertEquals(serverCnx.getState(), State.Start);
// test server response to CONNECT
ByteBuf clientCommand = Commands.newConnect("none", "");
ByteBuf clientCommand = Commands.newConnect("none", "", null);
channel.writeInbound(clientCommand);
assertEquals(serverCnx.getState(), State.Connected);
......@@ -223,7 +223,7 @@ public class ServerCnxTest {
assertEquals(serverCnx.getState(), State.Start);
// test server response to CONNECT
ByteBuf clientCommand = Commands.newConnect("none", "");
ByteBuf clientCommand = Commands.newConnect("none", "", null);
channel.writeInbound(clientCommand);
assertEquals(serverCnx.getState(), State.Connected);
......@@ -248,7 +248,7 @@ public class ServerCnxTest {
assertEquals(serverCnx.getState(), State.Start);
// test server response to CONNECT
ByteBuf clientCommand = Commands.newConnect("none", "", ProtocolVersion.v0.getNumber());
ByteBuf clientCommand = Commands.newConnect("none", "", ProtocolVersion.v0.getNumber(), null);
channel.writeInbound(clientCommand);
assertEquals(serverCnx.getState(), State.Connected);
......@@ -298,7 +298,7 @@ public class ServerCnxTest {
assertEquals(serverCnx.getState(), State.Start);
// test server response to CONNECT
ByteBuf clientCommand = Commands.newConnect("none", "");
ByteBuf clientCommand = Commands.newConnect("none", "", null);
channel.writeInbound(clientCommand);
assertEquals(serverCnx.getState(), State.Connected);
......@@ -320,7 +320,7 @@ public class ServerCnxTest {
assertEquals(serverCnx.getState(), State.Start);
// test server response to CONNECT
ByteBuf clientCommand = Commands.newConnect("none", "");
ByteBuf clientCommand = Commands.newConnect("none", "", null);
channel.writeInbound(clientCommand);
assertEquals(serverCnx.getState(), State.Start);
......
......@@ -30,6 +30,7 @@ import com.yahoo.pulsar.client.api.Authentication;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import com.yahoo.pulsar.common.api.Commands;
import static com.yahoo.pulsar.client.impl.HttpClient.getPulsarClientVersion;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
......@@ -93,7 +94,7 @@ public class ClientCnx extends PulsarHandler {
authData = authentication.getAuthData().getCommandData();
}
// Send CONNECT command
ctx.writeAndFlush(Commands.newConnect(authentication.getAuthMethodName(), authData)).addListener(future -> {
ctx.writeAndFlush(Commands.newConnect(authentication.getAuthMethodName(), authData, getPulsarClientVersion())).addListener(future -> {
if (future.isSuccess()) {
if (log.isDebugEnabled()) {
log.debug("Complete: {}", future.isSuccess());
......
......@@ -177,7 +177,7 @@ public class HttpClient implements Closeable {
*
* @return client version or unknown version depending on whether the file is found or not.
*/
private static String getPulsarClientVersion() {
public static String getPulsarClientVersion() {
String path = "/pulsar-client-version.properties";
String unknownClientIdentifier = "UnknownClient";
......
......@@ -77,15 +77,15 @@ public class Commands {
public static final short magicCrc32c = 0x0e01;
private static final int checksumSize = 4;
public static ByteBuf newConnect(String authMethodName, String authData) {
return newConnect(authMethodName, authData, getCurrentProtocolVersion());
public static ByteBuf newConnect(String authMethodName, String authData, String libVersion) {
return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion);
}
public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion) {
public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion) {
CommandConnect.Builder connectBuilder = CommandConnect.newBuilder();
connectBuilder.setClientVersion("Pulsar Client");
connectBuilder.setClientVersion(libVersion != null ? libVersion : "Pulsar Client");
connectBuilder.setAuthMethodName(authMethodName);
if ("ycav1".equals(authMethodName)) {
// Handle the case of a client that gets updated before the broker and starts sending the string auth method
// name. An example would be in broker-to-broker replication. We need to make sure the clients are still
......
......@@ -46,6 +46,9 @@ public class ConsumerStats {
/** Timestamp of connection */
public String connectedSince;
/** Client library version */
public String clientVersion;
public ConsumerStats add(ConsumerStats stats) {
checkNotNull(stats);
......
......@@ -40,6 +40,9 @@ public class PublisherStats {
/** Timestamp of connection */
public String connectedSince;
/** Client library version */
public String clientVersion;
public PublisherStats add(PublisherStats stats) {
checkNotNull(stats);
......
......@@ -218,7 +218,7 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(Commands.newConnect("", ""));
ctx.writeAndFlush(Commands.newConnect("", "", null));
latch.countDown();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册