Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
b81e90f0
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
b81e90f0
编写于
5月 20, 2021
作者:
Z
zhangjidi2016
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'upstream/develop' into develop
上级
e47f6557
432236e9
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
29 addition
and
421 deletion
+29
-421
common/src/test/java/org/apache/rocketmq/common/UtilAllTests.java
...rc/test/java/org/apache/rocketmq/common/UtilAllTests.java
+0
-381
pom.xml
pom.xml
+0
-11
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
...a/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+29
-29
未找到文件。
common/src/test/java/org/apache/rocketmq/common/UtilAllTests.java
已删除
100644 → 0
浏览文件 @
e47f6557
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common
;
import
java.io.File
;
import
java.io.IOException
;
import
java.lang.management.ManagementFactory
;
import
java.lang.management.RuntimeMXBean
;
import
java.net.InetAddress
;
import
java.net.NetworkInterface
;
import
java.util.Calendar
;
import
java.util.Date
;
import
java.util.Enumeration
;
import
org.junit.Assert
;
import
org.junit.Rule
;
import
org.junit.Test
;
import
org.junit.rules.ExpectedException
;
import
org.junit.rules.TemporaryFolder
;
import
org.junit.runner.RunWith
;
import
org.powermock.api.mockito.PowerMockito
;
import
org.powermock.core.classloader.annotations.PowerMockIgnore
;
import
org.powermock.core.classloader.annotations.PrepareForTest
;
import
org.powermock.modules.junit4.PowerMockRunner
;
@RunWith
(
PowerMockRunner
.
class
)
@PowerMockIgnore
(
"javax.management.*"
)
public
class
UtilAllTests
{
@Rule
private
TemporaryFolder
folder
=
new
TemporaryFolder
();
@Rule
private
ExpectedException
thrown
=
ExpectedException
.
none
();
@PrepareForTest
({
RuntimeMXBean
.
class
,
UtilAll
.
class
,
ManagementFactory
.
class
})
@Test
public
void
testGetPidIfNotNull
()
{
PowerMockito
.
mockStatic
(
ManagementFactory
.
class
);
RuntimeMXBean
runtimeMXBean
=
PowerMockito
.
mock
(
RuntimeMXBean
.
class
);
PowerMockito
.
when
(
runtimeMXBean
.
getName
())
.
thenReturn
(
"40@abc.com"
);
PowerMockito
.
doReturn
(
runtimeMXBean
).
when
(
ManagementFactory
.
class
);
ManagementFactory
.
getRuntimeMXBean
();
Assert
.
assertEquals
(
40
,
UtilAll
.
getPid
());
}
@PrepareForTest
({
RuntimeMXBean
.
class
,
UtilAll
.
class
,
ManagementFactory
.
class
})
@Test
public
void
testGetPidIfNull
()
{
PowerMockito
.
mockStatic
(
ManagementFactory
.
class
);
RuntimeMXBean
runtimeMXBean
=
PowerMockito
.
mock
(
RuntimeMXBean
.
class
);
PowerMockito
.
when
(
runtimeMXBean
.
getName
()).
thenReturn
(
null
);
PowerMockito
.
doReturn
(
runtimeMXBean
).
when
(
ManagementFactory
.
class
);
ManagementFactory
.
getRuntimeMXBean
();
Assert
.
assertEquals
(-
1
,
UtilAll
.
getPid
());
}
@PrepareForTest
({
UtilAll
.
class
})
@Test
public
void
testCurrentStackTraceIfNotNull
()
{
PowerMockito
.
mockStatic
(
Thread
.
class
);
StackTraceElement
stackTraceElement
=
new
StackTraceElement
(
"Bar"
,
"foo"
,
"baz"
,
101
);
PowerMockito
.
when
(
Thread
.
currentThread
().
getStackTrace
())
.
thenReturn
(
new
StackTraceElement
[]{
stackTraceElement
});
Assert
.
assertEquals
(
"\n\tBar.foo(baz:101)"
,
UtilAll
.
currentStackTrace
());
}
@Test
public
void
testOffset2FileNameIfInputNotNull
()
{
Assert
.
assertEquals
(
"00000000000000000002"
,
UtilAll
.
offset2FileName
(
2
l
));
Assert
.
assertEquals
(
"00000001558787014000"
,
UtilAll
.
offset2FileName
(
1558787014000
l
));
}
@PrepareForTest
({
UtilAll
.
class
,
System
.
class
})
@Test
public
void
testComputeEclipseTimeMillisecondsIfInputNotNull
()
{
PowerMockito
.
mockStatic
(
System
.
class
);
PowerMockito
.
when
(
System
.
currentTimeMillis
())
.
thenReturn
(
1558787014000
l
);
Assert
.
assertEquals
(
2592000000
l
,
UtilAll
.
computeEclipseTimeMilliseconds
(
1556195014000
l
));
}
@PrepareForTest
({
Calendar
.
class
,
UtilAll
.
class
})
@Test
public
void
testIsItTimeToDoIfCurrentSetTime
()
{
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTimeInMillis
(
1558787014000
l
);
PowerMockito
.
mockStatic
(
Calendar
.
class
);
PowerMockito
.
when
(
Calendar
.
getInstance
()).
thenReturn
(
calendar
);
Assert
.
assertTrue
(
UtilAll
.
isItTimeToDo
(
"13;12;14"
));
Assert
.
assertFalse
(
UtilAll
.
isItTimeToDo
(
"00;00;00;"
));
}
@PrepareForTest
({
UtilAll
.
class
,
System
.
class
})
@Test
public
void
testTimeMillisToHumanStringIfCurrentTimeNotNull
()
{
PowerMockito
.
mockStatic
(
System
.
class
);
PowerMockito
.
when
(
System
.
currentTimeMillis
())
.
thenReturn
(
1558787014000
l
);
Assert
.
assertEquals
(
"20190525132334000"
,
UtilAll
.
timeMillisToHumanString
());
}
@Test
public
void
testTimeMillisToHumanStringLongIfInputNotNull
()
{
Assert
.
assertEquals
(
"20190525132334000"
,
UtilAll
.
timeMillisToHumanString
(
1558787014000
l
));
}
@PrepareForTest
({
System
.
class
,
UtilAll
.
class
})
@Test
public
void
testComputNextMorningTimeMillisIfCurrentTimeNotNull
()
{
PowerMockito
.
mockStatic
(
System
.
class
);
PowerMockito
.
when
(
System
.
currentTimeMillis
())
.
thenReturn
(
1515585600000L
);
Assert
.
assertEquals
(
1515628800000
l
,
UtilAll
.
computNextMorningTimeMillis
());
}
@PrepareForTest
({
System
.
class
,
UtilAll
.
class
})
@Test
public
void
testComputNextMinutesTimeMillisIfCurrentTimeNotNull
()
{
PowerMockito
.
mockStatic
(
System
.
class
);
PowerMockito
.
when
(
System
.
currentTimeMillis
())
.
thenReturn
(
1515585600000L
);
Assert
.
assertEquals
(
1515585660000
l
,
UtilAll
.
computNextMinutesTimeMillis
());
}
@PrepareForTest
({
System
.
class
,
UtilAll
.
class
})
@Test
public
void
testComputNextHourTimeMillisIfCurrentTimeNotNull
()
{
PowerMockito
.
mockStatic
(
System
.
class
);
PowerMockito
.
when
(
System
.
currentTimeMillis
())
.
thenReturn
(
1515585600000L
);
Assert
.
assertEquals
(
1515589200000
l
,
UtilAll
.
computNextHourTimeMillis
());
}
@PrepareForTest
({
UtilAll
.
class
,
System
.
class
})
@Test
public
void
testComputNextHalfHourTimeMillisIfCurrentTimeNotNull
()
{
PowerMockito
.
mockStatic
(
System
.
class
);
PowerMockito
.
when
(
System
.
currentTimeMillis
())
.
thenReturn
(
1515585600000L
);
Assert
.
assertEquals
(
1515591000000
l
,
UtilAll
.
computNextHalfHourTimeMillis
());
}
@Test
public
void
testTimeMillisToHumanString2IfInputNotNull
()
{
Assert
.
assertEquals
(
"2019-04-25 13:23:34,000"
,
UtilAll
.
timeMillisToHumanString2
(
1556195014000
l
));
}
@Test
public
void
testTimeMillisToHumanString3IfInputNotNull
()
{
Assert
.
assertEquals
(
"20190425132334"
,
UtilAll
.
timeMillisToHumanString3
(
1556195014000
l
));
}
@Test
public
void
testGetDiskPartitionSpaceUsedPercentIfFileDoesNotExist
()
{
Assert
.
assertEquals
(-
1
,
UtilAll
.
getDiskPartitionSpaceUsedPercent
(
""
),
0
);
Assert
.
assertEquals
(-
1
,
UtilAll
.
getDiskPartitionSpaceUsedPercent
(
null
),
0
);
Assert
.
assertEquals
(-
1
,
UtilAll
.
getDiskPartitionSpaceUsedPercent
(
"foo"
),
0
);
}
@Test
public
void
testGetDiskPartitionSpaceUsedPercentIfFileExists
()
throws
IOException
{
File
file
=
folder
.
newFile
(
"foo.txt"
);
try
{
Assert
.
assertNotNull
(
UtilAll
.
getDiskPartitionSpaceUsedPercent
(
file
.
getPath
()));
}
finally
{
file
.
delete
();
folder
.
delete
();
}
}
@Test
public
void
testCrc32IfInputNull
()
{
Assert
.
assertEquals
(
0
,
UtilAll
.
crc32
(
null
));
Assert
.
assertEquals
(
0
,
UtilAll
.
crc32
(
new
byte
[]{}));
}
@Test
public
void
testCrc32IfInputNotNull
()
{
Assert
.
assertEquals
(
1438416925
,
UtilAll
.
crc32
(
new
byte
[]{
1
,
2
,
3
}));
}
@Test
public
void
testBytes2stringIfInputArrayNotEmpty
()
{
Assert
.
assertEquals
(
"010203"
,
UtilAll
.
bytes2string
(
new
byte
[]{
1
,
2
,
3
}));
}
@Test
public
void
testBytes2stringIfInputEmptyArray
()
{
Assert
.
assertEquals
(
""
,
UtilAll
.
bytes2string
(
new
byte
[
0
]));
}
@Test
public
void
testString2bytesIfInputNullOrEmptyString
()
{
Assert
.
assertNull
(
UtilAll
.
string2bytes
(
""
));
Assert
.
assertNull
(
UtilAll
.
string2bytes
(
null
));
}
@Test
public
void
testString2bytesIfInputNotNullOrEmptyString
()
{
Assert
.
assertArrayEquals
(
new
byte
[]{-
1
},
UtilAll
.
string2bytes
(
"foo"
));
}
@Test
public
void
testUncompressIfInputNotNull
()
throws
IOException
{
byte
[]
by
=
new
byte
[]{
120
,
1
,
1
,
3
,
0
,
-
4
,
-
1
,
1
,
2
,
3
,
0
,
13
,
0
,
7
};
Assert
.
assertArrayEquals
(
new
byte
[]{
1
,
2
,
3
},
UtilAll
.
uncompress
(
by
));
}
@Test
public
void
testCompressIfInputNotNull
()
throws
IOException
{
byte
[]
by
=
new
byte
[]{
120
,
1
,
1
,
3
,
0
,
-
4
,
-
1
,
1
,
2
,
3
,
0
,
13
,
0
,
7
};
Assert
.
assertArrayEquals
(
by
,
UtilAll
.
compress
(
new
byte
[]{
1
,
2
,
3
},
0
));
}
@Test
public
void
testAsIntIfInputNotNull
()
{
Assert
.
assertEquals
(
1
,
UtilAll
.
asInt
(
"foo"
,
1
));
Assert
.
assertEquals
(
123
,
UtilAll
.
asInt
(
"123"
,
1
));
}
@Test
public
void
testAsLongIfInputNotNull
()
{
Assert
.
assertEquals
(
1
l
,
UtilAll
.
asLong
(
"foo"
,
1
l
));
Assert
.
assertEquals
(
1556195014000
l
,
UtilAll
.
asLong
(
"1556195014000"
,
1
l
));
}
@Test
public
void
testFormatDateIfInputNotNull
()
{
Assert
.
assertEquals
(
"2019-04-25 13:23:34"
,
UtilAll
.
formatDate
(
new
Date
(
1556195014000
l
),
"yyyy-MM-dd HH:mm:ss"
));
}
@Test
public
void
testParseDateIfInputNotNull
()
{
Assert
.
assertNull
(
UtilAll
.
parseDate
(
"2019-04-25 13:23:34"
,
""
));
Assert
.
assertEquals
(
new
Date
(
1556195014000
l
),
UtilAll
.
parseDate
(
"2019-04-25 13:23:34"
,
"yyyy-MM-dd HH:mm:ss"
));
}
@Test
public
void
testResponseCode2StringIfInputNotNull
()
{
Assert
.
assertEquals
(
"2"
,
UtilAll
.
responseCode2String
(
2
));
}
@Test
public
void
testFrontStringAtLeastIfInputNotNull
()
{
Assert
.
assertEquals
(
""
,
UtilAll
.
frontStringAtLeast
(
""
,
2
));
Assert
.
assertEquals
(
"fo"
,
UtilAll
.
frontStringAtLeast
(
"foo"
,
2
));
}
@Test
public
void
testIsBlankIfInputNullOrEmpty
()
{
Assert
.
assertTrue
(
UtilAll
.
isBlank
(
""
));
Assert
.
assertTrue
(
UtilAll
.
isBlank
(
" "
));
Assert
.
assertTrue
(
UtilAll
.
isBlank
(
null
));
}
@Test
public
void
testIsBlankIfInputNotNull
()
{
Assert
.
assertFalse
(
UtilAll
.
isBlank
(
"foo"
));
}
@Test
public
void
testJstackIfNotNull
()
{
Assert
.
assertNotNull
(
UtilAll
.
jstack
());
}
@Test
public
void
testIsInternalIPIfInputNotNull
()
{
Assert
.
assertFalse
(
UtilAll
.
isInternalIP
(
new
byte
[]{
1
,
1
,
1
,
1
}));
Assert
.
assertTrue
(
UtilAll
.
isInternalIP
(
new
byte
[]{
10
,
1
,
2
,
3
}));
Assert
.
assertTrue
(
UtilAll
.
isInternalIP
(
new
byte
[]{(
byte
)
172
,
16
,
2
,
3
}));
Assert
.
assertTrue
(
UtilAll
.
isInternalIP
(
new
byte
[]{(
byte
)
192
,
(
byte
)
168
,
2
,
3
}));
}
@Test
public
void
testIsInternalIPIfLengthLessThan4
()
{
thrown
.
expect
(
RuntimeException
.
class
);
UtilAll
.
isInternalIP
(
new
byte
[
0
]);
// Method is not expected to return due to exception thrown
}
@Test
public
void
testIpToIPv4StrIfInputNotNull
()
{
Assert
.
assertNull
(
UtilAll
.
ipToIPv4Str
(
new
byte
[]{
1
}));
Assert
.
assertEquals
(
"10.1.2.3"
,
UtilAll
.
ipToIPv4Str
(
new
byte
[]{
10
,
1
,
2
,
3
}));
Assert
.
assertEquals
(
"172.16.2.3"
,
UtilAll
.
ipToIPv4Str
(
new
byte
[]{(
byte
)
172
,
16
,
2
,
3
}));
Assert
.
assertEquals
(
"192.168.2.3"
,
UtilAll
.
ipToIPv4Str
(
new
byte
[]{(
byte
)
192
,
(
byte
)
168
,
2
,
3
}));
}
@Test
public
void
testGetIP1IfCurrentIPNotNull
()
{
InetAddress
inetAddress
=
PowerMockito
.
mock
(
InetAddress
.
class
);
PowerMockito
.
when
(
inetAddress
.
getAddress
())
.
thenReturn
(
new
byte
[]{
10
,
1
,
2
,
3
});
Assert
.
assertArrayEquals
(
new
byte
[]{-
84
,
17
,
0
,
1
},
UtilAll
.
getIP
());
}
@Test
public
void
testGetIP2IfCurrentIPNotNull
()
{
InetAddress
inetAddress
=
PowerMockito
.
mock
(
InetAddress
.
class
);
PowerMockito
.
when
(
inetAddress
.
getAddress
()).
thenReturn
(
new
byte
[]{
1
});
Assert
.
assertArrayEquals
(
new
byte
[]{-
84
,
17
,
0
,
1
},
UtilAll
.
getIP
());
}
@PrepareForTest
({
UtilAll
.
class
,
Enumeration
.
class
,
NetworkInterface
.
class
})
@Test
public
void
testGetIPOutputRuntimeException
()
throws
Exception
{
PowerMockito
.
mockStatic
(
NetworkInterface
.
class
);
Enumeration
enumeration
=
PowerMockito
.
mock
(
Enumeration
.
class
);
PowerMockito
.
when
(
enumeration
.
hasMoreElements
()).
thenReturn
(
false
);
PowerMockito
.
doReturn
(
enumeration
).
when
(
NetworkInterface
.
class
);
NetworkInterface
.
getNetworkInterfaces
();
thrown
.
expect
(
RuntimeException
.
class
);
UtilAll
.
getIP
();
// Method is not expected to return due to exception thrown
}
}
pom.xml
浏览文件 @
b81e90f0
...
...
@@ -439,17 +439,6 @@
<version>
3.10.0
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.powermock
</groupId>
<artifactId>
powermock-api-mockito2
</artifactId>
<version>
1.7.0RC2
</version>
</dependency>
<dependency>
<groupId>
org.powermock
</groupId>
<artifactId>
powermock-module-junit4
</artifactId>
<version>
1.7.0RC2
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<dependencyManagement>
...
...
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
浏览文件 @
b81e90f0
...
...
@@ -426,17 +426,18 @@ public class DLedgerCommitLog extends CommitLog {
AppendFuture
<
AppendEntryResponse
>
dledgerFuture
;
EncodeResult
encodeResult
;
encodeResult
=
this
.
messageSerializer
.
serialize
(
msg
);
if
(
encodeResult
.
status
!=
AppendMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
new
AppendMessageResult
(
encodeResult
.
status
));
}
putMessageLock
.
lock
();
//spin or ReentrantLock ,depending on store config
long
elapsedTimeInLock
;
long
queueOffset
;
try
{
beginTimeInDledgerLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
encodeResult
=
this
.
messageSerializer
.
serialize
(
msg
);
queueOffset
=
getQueueOffsetByKey
(
encodeResult
.
queueOffsetKey
,
tranType
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
);
if
(
encodeResult
.
status
!=
AppendMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
new
AppendMessageResult
(
encodeResult
.
status
));
}
encodeResult
.
setQueueOffsetKey
(
queueOffset
,
false
);
AppendEntryRequest
request
=
new
AppendEntryRequest
();
request
.
setGroup
(
dLedgerConfig
.
getGroup
());
request
.
setRemoteId
(
dLedgerServer
.
getMemberState
().
getSelfId
());
...
...
@@ -542,6 +543,12 @@ public class DLedgerCommitLog extends CommitLog {
BatchAppendFuture
<
AppendEntryResponse
>
dledgerFuture
;
EncodeResult
encodeResult
;
encodeResult
=
this
.
messageSerializer
.
serialize
(
messageExtBatch
);
if
(
encodeResult
.
status
!=
AppendMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
new
AppendMessageResult
(
encodeResult
.
status
));
}
putMessageLock
.
lock
();
//spin or ReentrantLock ,depending on store config
msgIdBuilder
.
setLength
(
0
);
long
elapsedTimeInLock
;
...
...
@@ -549,12 +556,8 @@ public class DLedgerCommitLog extends CommitLog {
long
msgNum
=
0
;
try
{
beginTimeInDledgerLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
encodeResult
=
this
.
messageSerializer
.
serialize
(
messageExtBatch
);
queueOffset
=
topicQueueTable
.
get
(
encodeResult
.
queueOffsetKey
);
if
(
encodeResult
.
status
!=
AppendMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
new
AppendMessageResult
(
encodeResult
.
status
));
}
queueOffset
=
getQueueOffsetByKey
(
encodeResult
.
queueOffsetKey
,
tranType
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
,
true
);
BatchAppendEntryRequest
request
=
new
BatchAppendEntryRequest
();
request
.
setGroup
(
dLedgerConfig
.
getGroup
());
request
.
setRemoteId
(
dLedgerServer
.
getMemberState
().
getSelfId
());
...
...
@@ -664,7 +667,7 @@ public class DLedgerCommitLog extends CommitLog {
try
{
beginTimeInDledgerLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
queueOffset
=
getQueueOffsetByKey
(
encodeResult
.
queueOffsetKey
,
tranType
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
,
false
);
AppendEntryRequest
request
=
new
AppendEntryRequest
();
request
.
setGroup
(
dLedgerConfig
.
getGroup
());
request
.
setRemoteId
(
dLedgerServer
.
getMemberState
().
getSelfId
());
...
...
@@ -779,7 +782,8 @@ public class DLedgerCommitLog extends CommitLog {
long
msgNum
=
0
;
try
{
beginTimeInDledgerLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
queueOffset
=
topicQueueTable
.
get
(
encodeResult
.
queueOffsetKey
);
queueOffset
=
getQueueOffsetByKey
(
encodeResult
.
queueOffsetKey
,
tranType
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
,
true
);
BatchAppendEntryRequest
request
=
new
BatchAppendEntryRequest
();
request
.
setGroup
(
dLedgerConfig
.
getGroup
());
request
.
setRemoteId
(
dLedgerServer
.
getMemberState
().
getSelfId
());
...
...
@@ -957,8 +961,15 @@ public class DLedgerCommitLog extends CommitLog {
this
.
queueOffsetKey
=
queueOffsetKey
;
}
public
void
setQueueOffsetKey
(
long
offset
)
{
data
.
putLong
(
MessageDecoder
.
QUEUE_OFFSET_POSITION
,
offset
);
public
void
setQueueOffsetKey
(
long
offset
,
boolean
isBatch
)
{
if
(!
isBatch
)
{
this
.
data
.
putLong
(
MessageDecoder
.
QUEUE_OFFSET_POSITION
,
offset
);
return
;
}
for
(
byte
[]
data
:
batchData
)
{
ByteBuffer
.
wrap
(
data
).
putLong
(
MessageDecoder
.
QUEUE_OFFSET_POSITION
,
offset
++);
}
}
public
byte
[]
getData
()
{
...
...
@@ -977,8 +988,6 @@ public class DLedgerCommitLog extends CommitLog {
// The maximum length of the message
private
final
int
maxMessageSize
;
// Build Message Key
private
final
StringBuilder
keyBuilder
=
new
StringBuilder
();
MessageSerializer
(
final
int
size
)
{
this
.
maxMessageSize
=
size
;
...
...
@@ -1079,17 +1088,7 @@ public class DLedgerCommitLog extends CommitLog {
}
public
EncodeResult
serialize
(
final
MessageExtBatch
messageExtBatch
)
{
keyBuilder
.
setLength
(
0
);
keyBuilder
.
append
(
messageExtBatch
.
getTopic
());
keyBuilder
.
append
(
'-'
);
keyBuilder
.
append
(
messageExtBatch
.
getQueueId
());
String
key
=
keyBuilder
.
toString
();
Long
queueOffset
=
DLedgerCommitLog
.
this
.
topicQueueTable
.
get
(
key
);
if
(
null
==
queueOffset
)
{
queueOffset
=
0L
;
DLedgerCommitLog
.
this
.
topicQueueTable
.
put
(
key
,
queueOffset
);
}
String
key
=
messageExtBatch
.
getTopic
()
+
"-"
+
messageExtBatch
.
getQueueId
();
int
totalMsgLen
=
0
;
ByteBuffer
messagesByteBuff
=
messageExtBatch
.
wrap
();
...
...
@@ -1154,7 +1153,7 @@ public class DLedgerCommitLog extends CommitLog {
// 5 FLAG
msgStoreItemMemory
.
putInt
(
flag
);
// 6 QUEUEOFFSET
msgStoreItemMemory
.
putLong
(
queueOffset
++
);
msgStoreItemMemory
.
putLong
(
0L
);
// 7 PHYSICALOFFSET
msgStoreItemMemory
.
putLong
(
0
);
// 8 SYSFLAG
...
...
@@ -1210,6 +1209,7 @@ public class DLedgerCommitLog extends CommitLog {
this
.
sbr
=
sbr
;
}
@Override
public
synchronized
void
release
()
{
super
.
release
();
if
(
sbr
!=
null
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录