提交 d2cafcf6 编写于 作者: D dongeforever

Fix locktime and group

上级 b3ba2668
......@@ -60,7 +60,7 @@ public class CommitLog {
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
protected volatile long confirmOffset = -1L;
protected volatile long beginTimeInLock = 0;
private volatile long beginTimeInLock = 0;
protected final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
......
......@@ -54,6 +54,7 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
*/
public class DLegerCommitLog extends CommitLog {
private final DLegerServer dLegerServer;
private final DLegerConfig dLegerConfig;
private final DLegerMmapFileStore dLegerFileStore;
private final MmapFileList dLegerFileList;
......@@ -61,11 +62,11 @@ public class DLegerCommitLog extends CommitLog {
private final MessageSerializer messageSerializer;
private volatile long beginTimeInLock = 0;
private volatile long beginTimeInDlegerLock = 0;
public DLegerCommitLog(final DefaultMessageStore defaultMessageStore) {
super(defaultMessageStore);
DLegerConfig dLegerConfig = new DLegerConfig();
dLegerConfig = new DLegerConfig();
dLegerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
dLegerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
dLegerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
......@@ -257,8 +258,9 @@ public class DLegerCommitLog extends CommitLog {
return false;
}
@Override
public long getBeginTimeInLock() {
return beginTimeInLock;
return beginTimeInDlegerLock;
}
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
......@@ -305,7 +307,7 @@ public class DLegerCommitLog extends CommitLog {
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long queueOffset = -1;
try {
beginTimeInLock = this.defaultMessageStore.getSystemClock().now();
beginTimeInDlegerLock = this.defaultMessageStore.getSystemClock().now();
//TO DO use buffer
encodeResult = this.messageSerializer.serialize(msg);
queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
......@@ -319,6 +321,7 @@ public class DLegerCommitLog extends CommitLog {
}
} else {
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLegerConfig.getGroup());
request.setRemoteId(dLegerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data);
dlegerFuture = dLegerServer.handleAppend(request);
......@@ -340,12 +343,12 @@ public class DLegerCommitLog extends CommitLog {
}
}
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInLock;
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDlegerLock;
} catch (Exception e) {
log.error("Put message error", e);
appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
} finally {
beginTimeInLock = 0;
beginTimeInDlegerLock = 0;
putMessageLock.unlock();
}
......@@ -476,9 +479,10 @@ public class DLegerCommitLog extends CommitLog {
dLegerFileList.checkSelf();
}
@Override
public long lockTimeMills() {
long diff = 0;
long begin = this.beginTimeInLock;
long begin = this.beginTimeInDlegerLock;
if (begin > 0) {
diff = this.defaultMessageStore.now() - begin;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册