From 767775838fbbf556e6ebc899f4ccab7f148d7aae Mon Sep 17 00:00:00 2001 From: lizhanhui Date: Fri, 10 Feb 2017 17:55:04 +0800 Subject: [PATCH] [ROCKETMQ-74] Fix DataVersion equals defect, closes apache/incubator-rocketmq#50 --- .../apache/rocketmq/common/DataVersion.java | 30 +++++--- .../rocketmq/common/DataVersionTest.java | 70 +++++++++++++++++++ .../processor/DefaultRequestProcessor.java | 4 +- 3 files changed, 91 insertions(+), 13 deletions(-) create mode 100644 common/src/test/java/org/apache/rocketmq/common/DataVersionTest.java diff --git a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java index 0f42e3ff..e47a9b38 100644 --- a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java +++ b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java @@ -20,25 +20,25 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class DataVersion extends RemotingSerializable { - private long timestatmp = System.currentTimeMillis(); + private long timestamp = System.currentTimeMillis(); private AtomicLong counter = new AtomicLong(0); public void assignNewOne(final DataVersion dataVersion) { - this.timestatmp = dataVersion.timestatmp; + this.timestamp = dataVersion.timestamp; this.counter.set(dataVersion.counter.get()); } public void nextVersion() { - this.timestatmp = System.currentTimeMillis(); + this.timestamp = System.currentTimeMillis(); this.counter.incrementAndGet(); } - public long getTimestatmp() { - return timestatmp; + public long getTimestamp() { + return timestamp; } - public void setTimestatmp(long timestatmp) { - this.timestatmp = timestatmp; + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; } public AtomicLong getCounter() { @@ -58,16 +58,24 @@ public class DataVersion extends RemotingSerializable { final DataVersion that = (DataVersion) o; - if (timestatmp != that.timestatmp) + if (timestamp != that.timestamp) { return false; - return counter != null ? counter.equals(that.counter) : that.counter == null; + } + if (counter != null && that.counter != null) { + return counter.longValue() == that.counter.longValue(); + } + + return (null == counter) && (null == that.counter); } @Override public int hashCode() { - int result = (int) (timestatmp ^ (timestatmp >>> 32)); - result = 31 * result + (counter != null ? counter.hashCode() : 0); + int result = (int) (timestamp ^ (timestamp >>> 32)); + if (null != counter) { + long l = counter.get(); + result = 31 * result + (int)(l ^ (l >>> 32)); + } return result; } } diff --git a/common/src/test/java/org/apache/rocketmq/common/DataVersionTest.java b/common/src/test/java/org/apache/rocketmq/common/DataVersionTest.java new file mode 100644 index 00000000..f4d14e56 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/DataVersionTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import java.util.concurrent.atomic.AtomicLong; +import org.junit.Assert; +import org.junit.Test; + +public class DataVersionTest { + + @Test + public void testEquals() { + DataVersion dataVersion = new DataVersion(); + DataVersion other = new DataVersion(); + other.setTimestamp(dataVersion.getTimestamp()); + Assert.assertTrue(dataVersion.equals(other)); + } + + @Test + public void testEquals_falseWhenCounterDifferent() { + DataVersion dataVersion = new DataVersion(); + DataVersion other = new DataVersion(); + other.setCounter(new AtomicLong(1L)); + other.setTimestamp(dataVersion.getTimestamp()); + Assert.assertFalse(dataVersion.equals(other)); + } + + @Test + public void testEquals_falseWhenCounterDifferent2() { + DataVersion dataVersion = new DataVersion(); + DataVersion other = new DataVersion(); + other.setCounter(null); + other.setTimestamp(dataVersion.getTimestamp()); + Assert.assertFalse(dataVersion.equals(other)); + } + + @Test + public void testEquals_falseWhenCounterDifferent3() { + DataVersion dataVersion = new DataVersion(); + dataVersion.setCounter(null); + DataVersion other = new DataVersion(); + other.setTimestamp(dataVersion.getTimestamp()); + Assert.assertFalse(dataVersion.equals(other)); + } + + @Test + public void testEquals_trueWhenCountersBothNull() { + DataVersion dataVersion = new DataVersion(); + dataVersion.setCounter(null); + DataVersion other = new DataVersion(); + other.setCounter(null); + other.setTimestamp(dataVersion.getTimestamp()); + Assert.assertTrue(dataVersion.equals(other)); + } +} \ No newline at end of file diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index b6db7e03..96476842 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -191,7 +191,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class); } else { registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0)); - registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0); + registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0); } RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( @@ -227,7 +227,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { } else { topicConfigWrapper = new TopicConfigSerializeWrapper(); topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0)); - topicConfigWrapper.getDataVersion().setTimestatmp(0); + topicConfigWrapper.getDataVersion().setTimestamp(0); } RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( -- GitLab