Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
a5ea4e45
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
a5ea4e45
编写于
4月 17, 2017
作者:
Y
yukon
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add PushConsumer related implementation for OpenMessaging.
上级
ce146934
变更
10
显示空白变更内容
内联
并排
Showing
10 changed file
with
402 addition
and
20 deletion
+402
-20
example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
...apache/rocketmq/example/openmessaging/SimpleProducer.java
+3
-3
example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
...he/rocketmq/example/openmessaging/SimplePushConsumer.java
+58
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
...a/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+9
-6
openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
...ging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
+57
-3
openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
.../io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+62
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
.../io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+188
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
...va/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
+6
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
...ava/io/openmessaging/rocketmq/domain/NonStandardKeys.java
+9
-1
openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
.../openmessaging/rocketmq/producer/AbstractOMSProducer.java
+9
-6
pom.xml
pom.xml
+1
-1
未找到文件。
example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
浏览文件 @
a5ea4e45
...
@@ -47,14 +47,14 @@ public class SimpleProducer {
...
@@ -47,14 +47,14 @@ public class SimpleProducer {
}));
}));
{
{
Message
message
=
producer
.
createBytesMessageToTopic
(
"
HELLO_TOPIC"
,
"
HELLO_BODY"
.
getBytes
(
Charset
.
forName
(
"UTF-8"
)));
Message
message
=
producer
.
createBytesMessageToTopic
(
"
OMS_HELLO_TOPIC"
,
"OMS_
HELLO_BODY"
.
getBytes
(
Charset
.
forName
(
"UTF-8"
)));
SendResult
sendResult
=
producer
.
send
(
message
);
SendResult
sendResult
=
producer
.
send
(
message
);
//final Void aVoid = result.get(3000L);
//final Void aVoid = result.get(3000L);
System
.
out
.
println
(
"send async message OK, msgId: "
+
sendResult
.
messageId
());
System
.
out
.
println
(
"send async message OK, msgId: "
+
sendResult
.
messageId
());
}
}
{
{
final
Promise
<
SendResult
>
result
=
producer
.
sendAsync
(
producer
.
createBytesMessageToTopic
(
"
HELLO_TOPIC"
,
"
HELLO_BODY"
.
getBytes
(
Charset
.
forName
(
"UTF-8"
))));
final
Promise
<
SendResult
>
result
=
producer
.
sendAsync
(
producer
.
createBytesMessageToTopic
(
"
OMS_HELLO_TOPIC"
,
"OMS_
HELLO_BODY"
.
getBytes
(
Charset
.
forName
(
"UTF-8"
))));
result
.
addListener
(
new
PromiseListener
<
SendResult
>()
{
result
.
addListener
(
new
PromiseListener
<
SendResult
>()
{
@Override
public
void
operationCompleted
(
Promise
<
SendResult
>
promise
)
{
@Override
public
void
operationCompleted
(
Promise
<
SendResult
>
promise
)
{
System
.
out
.
println
(
"Send async message OK, msgId: "
+
promise
.
get
().
messageId
());
System
.
out
.
println
(
"Send async message OK, msgId: "
+
promise
.
get
().
messageId
());
...
@@ -67,7 +67,7 @@ public class SimpleProducer {
...
@@ -67,7 +67,7 @@ public class SimpleProducer {
}
}
{
{
producer
.
sendOneway
(
producer
.
createBytesMessageToTopic
(
"
HELLO_TOPIC"
,
"
HELLO_BODY"
.
getBytes
(
Charset
.
forName
(
"UTF-8"
))));
producer
.
sendOneway
(
producer
.
createBytesMessageToTopic
(
"
OMS_HELLO_TOPIC"
,
"OMS_
HELLO_BODY"
.
getBytes
(
Charset
.
forName
(
"UTF-8"
))));
System
.
out
.
println
(
"Send oneway message OK"
);
System
.
out
.
println
(
"Send oneway message OK"
);
}
}
}
}
...
...
example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
0 → 100644
浏览文件 @
a5ea4e45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.example.openmessaging
;
import
io.openmessaging.Message
;
import
io.openmessaging.MessageListener
;
import
io.openmessaging.MessagingAccessPoint
;
import
io.openmessaging.MessagingAccessPointFactory
;
import
io.openmessaging.OMS
;
import
io.openmessaging.PushConsumer
;
import
io.openmessaging.ReceivedMessageContext
;
import
io.openmessaging.rocketmq.domain.NonStandardKeys
;
public
class
SimplePushConsumer
{
public
static
void
main
(
String
[]
args
)
{
final
MessagingAccessPoint
messagingAccessPoint
=
MessagingAccessPointFactory
.
getMessagingAccessPoint
(
"openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace"
);
final
PushConsumer
consumer
=
messagingAccessPoint
.
createPushConsumer
(
OMS
.
newKeyValue
().
put
(
NonStandardKeys
.
CONSUMER_GROUP
,
"OMS_CONSUMER"
));
messagingAccessPoint
.
startup
();
System
.
out
.
println
(
"messagingAccessPoint startup OK"
);
Runtime
.
getRuntime
().
addShutdownHook
(
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
consumer
.
shutdown
();
messagingAccessPoint
.
shutdown
();
}
}));
consumer
.
attachQueue
(
"OMS_HELLO_TOPIC"
,
new
MessageListener
()
{
@Override
public
void
onMessage
(
final
Message
message
,
final
ReceivedMessageContext
context
)
{
System
.
out
.
println
(
"Received one message: "
+
message
);
context
.
ack
();
}
});
consumer
.
startup
();
System
.
out
.
println
(
"consumer startup OK"
);
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
浏览文件 @
a5ea4e45
...
@@ -25,7 +25,10 @@ import io.openmessaging.PushConsumer;
...
@@ -25,7 +25,10 @@ import io.openmessaging.PushConsumer;
import
io.openmessaging.ResourceManager
;
import
io.openmessaging.ResourceManager
;
import
io.openmessaging.SequenceProducer
;
import
io.openmessaging.SequenceProducer
;
import
io.openmessaging.ServiceEndPoint
;
import
io.openmessaging.ServiceEndPoint
;
import
io.openmessaging.exception.OMSNotSupportedException
;
import
io.openmessaging.observer.Observer
;
import
io.openmessaging.observer.Observer
;
import
io.openmessaging.rocketmq.consumer.PullConsumerImpl
;
import
io.openmessaging.rocketmq.consumer.PushConsumerImpl
;
import
io.openmessaging.rocketmq.producer.ProducerImpl
;
import
io.openmessaging.rocketmq.producer.ProducerImpl
;
import
io.openmessaging.rocketmq.producer.SequenceProducerImpl
;
import
io.openmessaging.rocketmq.producer.SequenceProducerImpl
;
...
@@ -63,32 +66,32 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
...
@@ -63,32 +66,32 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
@Override
@Override
public
PushConsumer
createPushConsumer
()
{
public
PushConsumer
createPushConsumer
()
{
return
n
ull
;
return
n
ew
PushConsumerImpl
(
accessPointProperties
)
;
}
}
@Override
@Override
public
PushConsumer
createPushConsumer
(
KeyValue
properties
)
{
public
PushConsumer
createPushConsumer
(
KeyValue
properties
)
{
return
n
ull
;
return
n
ew
PushConsumerImpl
(
OMSUtil
.
buildKeyValue
(
this
.
accessPointProperties
,
properties
))
;
}
}
@Override
@Override
public
PullConsumer
createPullConsumer
(
String
queueName
)
{
public
PullConsumer
createPullConsumer
(
String
queueName
)
{
return
n
ull
;
return
n
ew
PullConsumerImpl
(
accessPointProperties
)
;
}
}
@Override
@Override
public
PullConsumer
createPullConsumer
(
String
queueName
,
KeyValue
properties
)
{
public
PullConsumer
createPullConsumer
(
String
queueName
,
KeyValue
properties
)
{
return
n
ull
;
return
n
ew
PullConsumerImpl
(
OMSUtil
.
buildKeyValue
(
this
.
accessPointProperties
,
properties
))
;
}
}
@Override
@Override
public
IterableConsumer
createIterableConsumer
(
String
queueName
)
{
public
IterableConsumer
createIterableConsumer
(
String
queueName
)
{
return
null
;
throw
new
OMSNotSupportedException
(
"-1"
,
"IterableConsumer is not supported in RocketMQ"
)
;
}
}
@Override
@Override
public
IterableConsumer
createIterableConsumer
(
String
queueName
,
KeyValue
properties
)
{
public
IterableConsumer
createIterableConsumer
(
String
queueName
,
KeyValue
properties
)
{
return
null
;
throw
new
OMSNotSupportedException
(
"-1"
,
"IterableConsumer is not supported in RocketMQ"
)
;
}
}
@Override
@Override
...
...
openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
浏览文件 @
a5ea4e45
...
@@ -21,7 +21,12 @@ import io.openmessaging.KeyValue;
...
@@ -21,7 +21,12 @@ import io.openmessaging.KeyValue;
import
io.openmessaging.MessageHeader
;
import
io.openmessaging.MessageHeader
;
import
io.openmessaging.OMS
;
import
io.openmessaging.OMS
;
import
io.openmessaging.SendResult
;
import
io.openmessaging.SendResult
;
import
io.openmessaging.rocketmq.domain.BytesMessageImpl
;
import
io.openmessaging.rocketmq.domain.NonStandardKeys
;
import
io.openmessaging.rocketmq.domain.SendResultImpl
;
import
io.openmessaging.rocketmq.domain.SendResultImpl
;
import
java.lang.reflect.Field
;
import
java.util.Map
;
import
java.util.Set
;
import
org.apache.rocketmq.client.producer.SendStatus
;
import
org.apache.rocketmq.client.producer.SendStatus
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.message.MessageAccessor
;
import
org.apache.rocketmq.common.message.MessageAccessor
;
...
@@ -45,8 +50,13 @@ public class OMSUtil {
...
@@ -45,8 +50,13 @@ public class OMSUtil {
KeyValue
properties
=
omsMessage
.
properties
();
KeyValue
properties
=
omsMessage
.
properties
();
//All destinations in RocketMQ use Topic
//All destinations in RocketMQ use Topic
rmqMessage
.
setTopic
(
headers
.
containsKey
(
MessageHeader
.
TOPIC
)
if
(
headers
.
containsKey
(
MessageHeader
.
TOPIC
))
{
?
headers
.
getString
(
MessageHeader
.
TOPIC
)
:
headers
.
getString
(
MessageHeader
.
QUEUE
));
rmqMessage
.
setTopic
(
headers
.
getString
(
MessageHeader
.
TOPIC
));
rmqMessage
.
putUserProperty
(
NonStandardKeys
.
MESSAGE_DESTINATION
,
"TOPIC"
);
}
else
{
rmqMessage
.
setTopic
(
headers
.
getString
(
MessageHeader
.
QUEUE
));
rmqMessage
.
putUserProperty
(
NonStandardKeys
.
MESSAGE_DESTINATION
,
"QUEUE"
);
}
for
(
String
key
:
properties
.
keySet
())
{
for
(
String
key
:
properties
.
keySet
())
{
MessageAccessor
.
putProperty
(
rmqMessage
,
key
,
properties
.
getString
(
key
));
MessageAccessor
.
putProperty
(
rmqMessage
,
key
,
properties
.
getString
(
key
));
...
@@ -60,6 +70,50 @@ public class OMSUtil {
...
@@ -60,6 +70,50 @@ public class OMSUtil {
return
rmqMessage
;
return
rmqMessage
;
}
}
public
static
BytesMessage
msgConvert
(
org
.
apache
.
rocketmq
.
common
.
message
.
MessageExt
rmqMsg
)
{
BytesMessage
omsMsg
=
new
BytesMessageImpl
();
omsMsg
.
setBody
(
rmqMsg
.
getBody
());
KeyValue
headers
=
omsMsg
.
headers
();
KeyValue
properties
=
omsMsg
.
properties
();
final
Set
<
Map
.
Entry
<
String
,
String
>>
entries
=
rmqMsg
.
getProperties
().
entrySet
();
for
(
final
Map
.
Entry
<
String
,
String
>
entry
:
entries
)
{
if
(
isOMSHeader
(
entry
.
getKey
()))
{
headers
.
put
(
entry
.
getKey
(),
entry
.
getValue
());
}
else
{
properties
.
put
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
omsMsg
.
putHeaders
(
MessageHeader
.
MESSAGE_ID
,
rmqMsg
.
getMsgId
());
if
(
rmqMsg
.
getProperties
().
get
(
NonStandardKeys
.
MESSAGE_DESTINATION
).
equals
(
"TOPIC"
))
{
omsMsg
.
putHeaders
(
MessageHeader
.
TOPIC
,
rmqMsg
.
getTopic
());
}
else
{
omsMsg
.
putHeaders
(
MessageHeader
.
QUEUE
,
rmqMsg
.
getTopic
());
}
omsMsg
.
putHeaders
(
MessageHeader
.
SEARCH_KEY
,
rmqMsg
.
getKeys
());
omsMsg
.
putHeaders
(
MessageHeader
.
BORN_HOST
,
String
.
valueOf
(
rmqMsg
.
getBornHost
()));
omsMsg
.
putHeaders
(
MessageHeader
.
BORN_TIMESTAMP
,
rmqMsg
.
getBornTimestamp
());
omsMsg
.
putHeaders
(
MessageHeader
.
STORE_HOST
,
String
.
valueOf
(
rmqMsg
.
getStoreHost
()));
omsMsg
.
putHeaders
(
MessageHeader
.
STORE_TIMESTAMP
,
rmqMsg
.
getStoreTimestamp
());
return
omsMsg
;
}
public
static
boolean
isOMSHeader
(
String
value
)
{
for
(
Field
field
:
MessageHeader
.
class
.
getDeclaredFields
())
{
try
{
if
(
field
.
get
(
MessageHeader
.
class
).
equals
(
value
))
{
return
true
;
}
}
catch
(
IllegalAccessException
e
)
{
return
false
;
}
}
return
false
;
}
/**
/**
* Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
* Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
*/
*/
...
@@ -68,7 +122,7 @@ public class OMSUtil {
...
@@ -68,7 +122,7 @@ public class OMSUtil {
return
new
SendResultImpl
(
rmqResult
.
getMsgId
(),
OMS
.
newKeyValue
());
return
new
SendResultImpl
(
rmqResult
.
getMsgId
(),
OMS
.
newKeyValue
());
}
}
public
static
KeyValue
buildKeyValue
(
KeyValue
...
keyValues
)
{
public
static
KeyValue
buildKeyValue
(
KeyValue
...
keyValues
)
{
KeyValue
keyValue
=
OMS
.
newKeyValue
();
KeyValue
keyValue
=
OMS
.
newKeyValue
();
for
(
KeyValue
properties
:
keyValues
)
{
for
(
KeyValue
properties
:
keyValues
)
{
for
(
String
key
:
properties
.
keySet
())
{
for
(
String
key
:
properties
.
keySet
())
{
...
...
openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
0 → 100644
浏览文件 @
a5ea4e45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
io.openmessaging.rocketmq.consumer
;
import
io.openmessaging.KeyValue
;
import
io.openmessaging.Message
;
import
io.openmessaging.PullConsumer
;
public
class
PullConsumerImpl
implements
PullConsumer
{
public
PullConsumerImpl
(
final
KeyValue
properties
)
{
}
@Override
public
KeyValue
properties
()
{
return
null
;
}
@Override
public
Message
poll
()
{
return
null
;
}
@Override
public
Message
poll
(
final
KeyValue
properties
)
{
return
null
;
}
@Override
public
void
ack
(
final
String
messageId
)
{
}
@Override
public
void
ack
(
final
String
messageId
,
final
KeyValue
properties
)
{
}
@Override
public
void
startup
()
{
}
@Override
public
void
shutdown
()
{
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
0 → 100644
浏览文件 @
a5ea4e45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
io.openmessaging.rocketmq.consumer
;
import
io.openmessaging.BytesMessage
;
import
io.openmessaging.KeyValue
;
import
io.openmessaging.MessageListener
;
import
io.openmessaging.OMS
;
import
io.openmessaging.PropertyKeys
;
import
io.openmessaging.PushConsumer
;
import
io.openmessaging.ReceivedMessageContext
;
import
io.openmessaging.exception.OMSRuntimeException
;
import
io.openmessaging.rocketmq.OMSUtil
;
import
io.openmessaging.rocketmq.domain.NonStandardKeys
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
;
import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.common.message.MessageExt
;
public
class
PushConsumerImpl
implements
PushConsumer
{
private
final
DefaultMQPushConsumer
rocketmqPushConsumer
;
private
final
KeyValue
properties
;
private
boolean
started
=
false
;
private
final
Map
<
String
,
MessageListener
>
subscribeTable
=
new
ConcurrentHashMap
<>();
public
PushConsumerImpl
(
final
KeyValue
properties
)
{
this
.
rocketmqPushConsumer
=
new
DefaultMQPushConsumer
();
this
.
properties
=
properties
;
String
accessPoints
=
properties
.
getString
(
PropertyKeys
.
ACCESS_POINTS
);
if
(
accessPoints
==
null
||
accessPoints
.
isEmpty
())
{
throw
new
OMSRuntimeException
(
"-1"
,
"OMS AccessPoints is null or empty."
);
}
this
.
rocketmqPushConsumer
.
setNamesrvAddr
(
accessPoints
.
replace
(
','
,
';'
));
String
consumerGroup
=
properties
.
getString
(
NonStandardKeys
.
CONSUMER_GROUP
);
if
(
null
==
consumerGroup
||
consumerGroup
.
isEmpty
())
{
throw
new
OMSRuntimeException
(
"-1"
,
"Consumer Group is necessary for RocketMQ, please set it."
);
}
this
.
rocketmqPushConsumer
.
setConsumerGroup
(
consumerGroup
);
int
maxReDeliveryTimes
=
properties
.
getInt
(
NonStandardKeys
.
MAX_REDELIVERY_TIMES
);
if
(
maxReDeliveryTimes
!=
0
)
{
this
.
rocketmqPushConsumer
.
setMaxReconsumeTimes
(
maxReDeliveryTimes
);
}
int
messageConsumeTimeout
=
properties
.
getInt
(
NonStandardKeys
.
MESSAGE_CONSUME_TIMEOUT
);
if
(
messageConsumeTimeout
!=
0
)
{
this
.
rocketmqPushConsumer
.
setConsumeTimeout
(
messageConsumeTimeout
);
}
int
maxConsumeThreadNums
=
properties
.
getInt
(
NonStandardKeys
.
MAX_CONSUME_THREAD_NUMS
);
if
(
maxConsumeThreadNums
!=
0
)
{
this
.
rocketmqPushConsumer
.
setConsumeThreadMax
(
maxConsumeThreadNums
);
}
int
minConsumeThreadNums
=
properties
.
getInt
(
NonStandardKeys
.
MIN_CONSUME_THREAD_NUMS
);
if
(
minConsumeThreadNums
!=
0
)
{
this
.
rocketmqPushConsumer
.
setConsumeThreadMin
(
minConsumeThreadNums
);
}
String
consumerId
=
OMSUtil
.
buildInstanceName
();
this
.
rocketmqPushConsumer
.
setInstanceName
(
consumerId
);
properties
.
put
(
PropertyKeys
.
CONSUMER_ID
,
consumerId
);
this
.
rocketmqPushConsumer
.
registerMessageListener
(
new
MessageListenerImpl
());
}
@Override
public
KeyValue
properties
()
{
return
properties
;
}
@Override
public
void
resume
()
{
this
.
rocketmqPushConsumer
.
resume
();
}
@Override
public
void
suspend
()
{
this
.
rocketmqPushConsumer
.
suspend
();
}
@Override
public
boolean
isSuspended
()
{
return
this
.
rocketmqPushConsumer
.
getDefaultMQPushConsumerImpl
().
isPause
();
}
@Override
public
PushConsumer
attachQueue
(
final
String
queueName
,
final
MessageListener
listener
)
{
this
.
subscribeTable
.
put
(
queueName
,
listener
);
try
{
this
.
rocketmqPushConsumer
.
subscribe
(
queueName
,
"*"
);
}
catch
(
MQClientException
e
)
{
throw
new
OMSRuntimeException
(
"-1"
,
String
.
format
(
"RocketMQ push consumer can't attach to %s."
,
queueName
));
}
return
this
;
}
@Override
public
synchronized
void
startup
()
{
if
(!
started
)
{
try
{
this
.
rocketmqPushConsumer
.
start
();
}
catch
(
MQClientException
e
)
{
throw
new
OMSRuntimeException
(
"-1"
,
e
);
}
}
this
.
started
=
true
;
}
@Override
public
synchronized
void
shutdown
()
{
if
(
this
.
started
)
{
this
.
rocketmqPushConsumer
.
shutdown
();
}
this
.
started
=
false
;
}
class
MessageListenerImpl
implements
MessageListenerConcurrently
{
@Override
public
ConsumeConcurrentlyStatus
consumeMessage
(
List
<
MessageExt
>
rmqMsgList
,
ConsumeConcurrentlyContext
contextRMQ
)
{
MessageExt
rmqMsg
=
rmqMsgList
.
get
(
0
);
BytesMessage
omsMsg
=
OMSUtil
.
msgConvert
(
rmqMsg
);
MessageListener
listener
=
PushConsumerImpl
.
this
.
subscribeTable
.
get
(
rmqMsg
.
getTopic
());
if
(
listener
==
null
)
{
throw
new
OMSRuntimeException
(
"-1"
,
String
.
format
(
"The topic/queue %s isn't attached to this consumer"
,
rmqMsg
.
getTopic
()));
}
final
KeyValue
contextProperties
=
OMS
.
newKeyValue
();
final
CountDownLatch
sync
=
new
CountDownLatch
(
1
);
ReceivedMessageContext
context
=
new
ReceivedMessageContext
()
{
@Override
public
KeyValue
properties
()
{
return
contextProperties
;
}
@Override
public
void
ack
()
{
sync
.
countDown
();
contextProperties
.
put
(
NonStandardKeys
.
MESSAGE_CONSUME_STATUS
,
ConsumeConcurrentlyStatus
.
CONSUME_SUCCESS
.
name
());
}
@Override
public
void
ack
(
final
KeyValue
properties
)
{
sync
.
countDown
();
contextProperties
.
put
(
NonStandardKeys
.
MESSAGE_CONSUME_STATUS
,
properties
.
getString
(
NonStandardKeys
.
MESSAGE_CONSUME_STATUS
));
}
};
listener
.
onMessage
(
omsMsg
,
context
);
try
{
sync
.
await
(
PushConsumerImpl
.
this
.
rocketmqPushConsumer
.
getConsumeTimeout
(),
TimeUnit
.
MILLISECONDS
);
}
catch
(
InterruptedException
ignore
)
{
}
return
ConsumeConcurrentlyStatus
.
valueOf
(
contextProperties
.
getString
(
NonStandardKeys
.
MESSAGE_CONSUME_STATUS
));
}
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
浏览文件 @
a5ea4e45
...
@@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage;
...
@@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage;
import
io.openmessaging.KeyValue
;
import
io.openmessaging.KeyValue
;
import
io.openmessaging.Message
;
import
io.openmessaging.Message
;
import
io.openmessaging.OMS
;
import
io.openmessaging.OMS
;
import
org.apache.commons.lang3.builder.ToStringBuilder
;
public
class
BytesMessageImpl
implements
BytesMessage
{
public
class
BytesMessageImpl
implements
BytesMessage
{
private
KeyValue
headers
;
private
KeyValue
headers
;
...
@@ -99,4 +100,9 @@ public class BytesMessageImpl implements BytesMessage {
...
@@ -99,4 +100,9 @@ public class BytesMessageImpl implements BytesMessage {
properties
.
put
(
key
,
value
);
properties
.
put
(
key
,
value
);
return
this
;
return
this
;
}
}
@Override
public
String
toString
()
{
return
ToStringBuilder
.
reflectionToString
(
this
);
}
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
浏览文件 @
a5ea4e45
...
@@ -16,5 +16,13 @@
...
@@ -16,5 +16,13 @@
*/
*/
package
io.openmessaging.rocketmq.domain
;
package
io.openmessaging.rocketmq.domain
;
public
class
NonStandardKeys
{
public
interface
NonStandardKeys
{
String
CONSUMER_GROUP
=
"rmq.consumer.group"
;
String
PRODUCER_GROUP
=
"rmq.producer.group"
;
String
MAX_REDELIVERY_TIMES
=
"rmq.max.redelivery.times"
;
String
MESSAGE_CONSUME_TIMEOUT
=
"rmq.message.consume.timeout"
;
String
MAX_CONSUME_THREAD_NUMS
=
"rmq.max.consume.thread.nums"
;
String
MIN_CONSUME_THREAD_NUMS
=
"rmq.min.consume.thread.nums"
;
String
MESSAGE_CONSUME_STATUS
=
"rmq.message.consume.status"
;
String
MESSAGE_DESTINATION
=
"rmq.message.destination"
;
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
浏览文件 @
a5ea4e45
...
@@ -28,6 +28,7 @@ import io.openmessaging.exception.OMSNotSupportedException;
...
@@ -28,6 +28,7 @@ import io.openmessaging.exception.OMSNotSupportedException;
import
io.openmessaging.exception.OMSRuntimeException
;
import
io.openmessaging.exception.OMSRuntimeException
;
import
io.openmessaging.exception.OMSTimeOutException
;
import
io.openmessaging.exception.OMSTimeOutException
;
import
io.openmessaging.rocketmq.domain.BytesMessageImpl
;
import
io.openmessaging.rocketmq.domain.BytesMessageImpl
;
import
io.openmessaging.rocketmq.domain.NonStandardKeys
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.client.log.ClientLogger
;
...
@@ -50,20 +51,22 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{
...
@@ -50,20 +51,22 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{
this
.
rocketmqProducer
=
new
DefaultMQProducer
();
this
.
rocketmqProducer
=
new
DefaultMQProducer
();
String
accessPoints
=
properties
.
getString
(
PropertyKeys
.
ACCESS_POINTS
);
String
accessPoints
=
properties
.
getString
(
PropertyKeys
.
ACCESS_POINTS
);
if
(
accessPoints
==
null
||
accessPoints
.
isEmpty
())
{
if
(
accessPoints
==
null
||
accessPoints
.
isEmpty
())
{
throw
new
OMSRuntimeException
(
"-1"
,
"OMS AccessPoints is null or empty."
);
throw
new
OMSRuntimeException
(
"-1"
,
"OMS AccessPoints is null or empty."
);
}
}
String
producerId
=
buildInstanceName
(
);
this
.
rocketmqProducer
.
setNamesrvAddr
(
accessPoints
.
replace
(
','
,
';'
)
);
int
operationTimeout
=
properties
.
getInt
(
PropertyKeys
.
OPERATION_TIMEOUT
);
String
producerGroup
=
properties
.
getString
(
NonStandardKeys
.
PRODUCER_GROUP
);
if
(
producerGroup
==
null
||
producerGroup
.
isEmpty
())
{
producerGroup
=
"__OMS_PRODUCER_DEFAULT_GROUP"
;
}
this
.
rocketmqProducer
.
setProducerGroup
(
producerGroup
);
this
.
rocketmqProducer
.
setProducerGroup
(
producerId
);
String
producerId
=
buildInstanceName
();
int
operationTimeout
=
properties
.
getInt
(
PropertyKeys
.
OPERATION_TIMEOUT
);
this
.
rocketmqProducer
.
setSendMsgTimeout
(
operationTimeout
==
0
?
5000
:
operationTimeout
);
this
.
rocketmqProducer
.
setSendMsgTimeout
(
operationTimeout
==
0
?
5000
:
operationTimeout
);
this
.
rocketmqProducer
.
setInstanceName
(
producerId
);
this
.
rocketmqProducer
.
setInstanceName
(
producerId
);
this
.
rocketmqProducer
.
setNamesrvAddr
(
accessPoints
.
replace
(
','
,
';'
));
this
.
rocketmqProducer
.
setMaxMessageSize
(
1024
*
1024
*
4
);
this
.
rocketmqProducer
.
setMaxMessageSize
(
1024
*
1024
*
4
);
properties
.
put
(
PropertyKeys
.
PRODUCER_ID
,
producerId
);
properties
.
put
(
PropertyKeys
.
PRODUCER_ID
,
producerId
);
}
}
...
...
pom.xml
浏览文件 @
a5ea4e45
...
@@ -607,7 +607,7 @@
...
@@ -607,7 +607,7 @@
<dependency>
<dependency>
<groupId>
io.openmessaging
</groupId>
<groupId>
io.openmessaging
</groupId>
<artifactId>
openmessaging-api
</artifactId>
<artifactId>
openmessaging-api
</artifactId>
<version>
0.1.0-
bet
a
</version>
<version>
0.1.0-
alph
a
</version>
</dependency>
</dependency>
</dependencies>
</dependencies>
</dependencyManagement>
</dependencyManagement>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录