Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
ce146934
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
267
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
ce146934
编写于
4月 11, 2017
作者:
Y
yukon
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add Producer related implementation for OpenMessaging.
上级
c60ac522
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
959 addition
and
8 deletion
+959
-8
example/pom.xml
example/pom.xml
+9
-0
example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
...apache/rocketmq/example/openmessaging/SimpleProducer.java
+51
-0
openmessaging/pom.xml
openmessaging/pom.xml
+12
-1
openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
...a/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+18
-5
openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
...ging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
+80
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
...va/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
+102
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
...ava/io/openmessaging/rocketmq/domain/NonStandardKeys.java
+20
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
...java/io/openmessaging/rocketmq/domain/SendResultImpl.java
+40
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
.../openmessaging/rocketmq/producer/AbstractOMSProducer.java
+138
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
...java/io/openmessaging/rocketmq/producer/ProducerImpl.java
+124
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
...openmessaging/rocketmq/producer/SequenceProducerImpl.java
+91
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
...ava/io/openmessaging/rocketmq/promise/DefaultPromise.java
+227
-0
openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
...n/java/io/openmessaging/rocketmq/promise/FutureState.java
+45
-0
pom.xml
pom.xml
+2
-2
未找到文件。
example/pom.xml
浏览文件 @
ce146934
...
...
@@ -48,5 +48,14 @@
<groupId>
org.javassist
</groupId>
<artifactId>
javassist
</artifactId>
</dependency>
<dependency>
<groupId>
io.openmessaging
</groupId>
<artifactId>
openmessaging-api
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.rocketmq
</groupId>
<artifactId>
rocketmq-openmessaging
</artifactId>
<version>
4.1.0-incubating-SNAPSHOT
</version>
</dependency>
</dependencies>
</project>
example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
浏览文件 @
ce146934
...
...
@@ -16,8 +16,59 @@
*/
package
org.apache.rocketmq.example.openmessaging
;
import
io.openmessaging.Message
;
import
io.openmessaging.MessagingAccessPoint
;
import
io.openmessaging.MessagingAccessPointFactory
;
import
io.openmessaging.Producer
;
import
io.openmessaging.Promise
;
import
io.openmessaging.PromiseListener
;
import
io.openmessaging.SendResult
;
import
java.nio.charset.Charset
;
public
class
SimpleProducer
{
public
static
void
main
(
String
[]
args
)
{
final
MessagingAccessPoint
messagingAccessPoint
=
MessagingAccessPointFactory
.
getMessagingAccessPoint
(
"openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace"
);
final
Producer
producer
=
messagingAccessPoint
.
createProducer
();
messagingAccessPoint
.
startup
();
System
.
out
.
println
(
"messagingAccessPoint startup OK"
);
producer
.
startup
();
System
.
out
.
println
(
"producer startup OK"
);
Runtime
.
getRuntime
().
addShutdownHook
(
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
producer
.
shutdown
();
messagingAccessPoint
.
shutdown
();
}
}));
{
Message
message
=
producer
.
createBytesMessageToTopic
(
"HELLO_TOPIC"
,
"HELLO_BODY"
.
getBytes
(
Charset
.
forName
(
"UTF-8"
)));
SendResult
sendResult
=
producer
.
send
(
message
);
//final Void aVoid = result.get(3000L);
System
.
out
.
println
(
"send async message OK, msgId: "
+
sendResult
.
messageId
());
}
{
final
Promise
<
SendResult
>
result
=
producer
.
sendAsync
(
producer
.
createBytesMessageToTopic
(
"HELLO_TOPIC"
,
"HELLO_BODY"
.
getBytes
(
Charset
.
forName
(
"UTF-8"
))));
result
.
addListener
(
new
PromiseListener
<
SendResult
>()
{
@Override
public
void
operationCompleted
(
Promise
<
SendResult
>
promise
)
{
System
.
out
.
println
(
"Send async message OK, msgId: "
+
promise
.
get
().
messageId
());
}
@Override
public
void
operationFailed
(
Promise
<
SendResult
>
promise
)
{
System
.
out
.
println
(
"send async message Failed, error: "
+
promise
.
getThrowable
().
getMessage
());
}
});
}
{
producer
.
sendOneway
(
producer
.
createBytesMessageToTopic
(
"HELLO_TOPIC"
,
"HELLO_BODY"
.
getBytes
(
Charset
.
forName
(
"UTF-8"
))));
System
.
out
.
println
(
"Send oneway message OK"
);
}
}
}
openmessaging/pom.xml
浏览文件 @
ce146934
...
...
@@ -27,11 +27,22 @@
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
rocketmq-openmessaging
</artifactId>
<name>
rocketmq-openmessaging ${project.version}
</name>
<dependencies>
<dependency>
<groupId>
io.openmessaging
</groupId>
<artifactId>
messaging-user-level-api
</artifactId>
<artifactId>
openmessaging-api
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.rocketmq
</groupId>
<artifactId>
rocketmq-client
</artifactId>
</dependency>
<dependency>
<groupId>
javax.jms
</groupId>
<artifactId>
javax.jms-api
</artifactId>
<version>
2.0.1
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
浏览文件 @
ce146934
...
...
@@ -26,26 +26,39 @@ import io.openmessaging.ResourceManager;
import
io.openmessaging.SequenceProducer
;
import
io.openmessaging.ServiceEndPoint
;
import
io.openmessaging.observer.Observer
;
import
io.openmessaging.rocketmq.producer.ProducerImpl
;
import
io.openmessaging.rocketmq.producer.SequenceProducerImpl
;
public
class
MessagingAccessPointImpl
implements
MessagingAccessPoint
{
private
final
KeyValue
accessPointProperties
;
public
MessagingAccessPointImpl
(
final
KeyValue
accessPointProperties
)
{
this
.
accessPointProperties
=
accessPointProperties
;
}
@Override
public
KeyValue
properties
()
{
return
accessPointProperties
;
}
@Override
public
Producer
createProducer
()
{
return
n
ull
;
return
n
ew
ProducerImpl
(
this
.
accessPointProperties
)
;
}
@Override
public
Producer
createProducer
(
KeyValue
properties
)
{
return
n
ull
;
return
n
ew
ProducerImpl
(
OMSUtil
.
buildKeyValue
(
this
.
accessPointProperties
,
properties
))
;
}
@Override
public
SequenceProducer
createSequenceProducer
()
{
return
n
ull
;
return
n
ew
SequenceProducerImpl
(
this
.
accessPointProperties
)
;
}
@Override
public
SequenceProducer
createSequenceProducer
(
KeyValue
properties
)
{
return
n
ull
;
return
n
ew
SequenceProducerImpl
(
OMSUtil
.
buildKeyValue
(
this
.
accessPointProperties
,
properties
))
;
}
@Override
...
...
@@ -79,7 +92,7 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
}
@Override
public
ResourceManager
create
ResourceManager
()
{
public
ResourceManager
get
ResourceManager
()
{
return
null
;
}
...
...
openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
0 → 100644
浏览文件 @
ce146934
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
io.openmessaging.rocketmq
;
import
io.openmessaging.BytesMessage
;
import
io.openmessaging.KeyValue
;
import
io.openmessaging.MessageHeader
;
import
io.openmessaging.OMS
;
import
io.openmessaging.SendResult
;
import
io.openmessaging.rocketmq.domain.SendResultImpl
;
import
org.apache.rocketmq.client.producer.SendStatus
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.message.MessageAccessor
;
public
class
OMSUtil
{
/**
* Builds a OMS client instance name.
*
* @return a unique instance name
*/
public
static
String
buildInstanceName
()
{
return
Integer
.
toString
(
UtilAll
.
getPid
())
+
"%OpenMessaging"
+
"%"
+
System
.
nanoTime
();
}
public
static
org
.
apache
.
rocketmq
.
common
.
message
.
Message
msgConvert
(
BytesMessage
omsMessage
)
{
org
.
apache
.
rocketmq
.
common
.
message
.
Message
rmqMessage
=
new
org
.
apache
.
rocketmq
.
common
.
message
.
Message
();
rmqMessage
.
setBody
(
omsMessage
.
getBody
());
KeyValue
headers
=
omsMessage
.
headers
();
KeyValue
properties
=
omsMessage
.
properties
();
//All destinations in RocketMQ use Topic
rmqMessage
.
setTopic
(
headers
.
containsKey
(
MessageHeader
.
TOPIC
)
?
headers
.
getString
(
MessageHeader
.
TOPIC
)
:
headers
.
getString
(
MessageHeader
.
QUEUE
));
for
(
String
key
:
properties
.
keySet
())
{
MessageAccessor
.
putProperty
(
rmqMessage
,
key
,
properties
.
getString
(
key
));
}
//Headers has a high priority
for
(
String
key
:
headers
.
keySet
())
{
MessageAccessor
.
putProperty
(
rmqMessage
,
key
,
headers
.
getString
(
key
));
}
return
rmqMessage
;
}
/**
* Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
*/
public
static
SendResult
sendResultConvert
(
org
.
apache
.
rocketmq
.
client
.
producer
.
SendResult
rmqResult
)
{
assert
rmqResult
.
getSendStatus
().
equals
(
SendStatus
.
SEND_OK
);
return
new
SendResultImpl
(
rmqResult
.
getMsgId
(),
OMS
.
newKeyValue
());
}
public
static
KeyValue
buildKeyValue
(
KeyValue
...
keyValues
)
{
KeyValue
keyValue
=
OMS
.
newKeyValue
();
for
(
KeyValue
properties
:
keyValues
)
{
for
(
String
key
:
properties
.
keySet
())
{
keyValue
.
put
(
key
,
properties
.
getString
(
key
));
}
}
return
keyValue
;
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
0 → 100644
浏览文件 @
ce146934
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
io.openmessaging.rocketmq.domain
;
import
io.openmessaging.BytesMessage
;
import
io.openmessaging.KeyValue
;
import
io.openmessaging.Message
;
import
io.openmessaging.OMS
;
public
class
BytesMessageImpl
implements
BytesMessage
{
private
KeyValue
headers
;
private
KeyValue
properties
;
private
byte
[]
body
;
public
BytesMessageImpl
()
{
this
.
headers
=
OMS
.
newKeyValue
();
this
.
properties
=
OMS
.
newKeyValue
();
}
@Override
public
byte
[]
getBody
()
{
return
body
;
}
@Override
public
BytesMessage
setBody
(
final
byte
[]
body
)
{
this
.
body
=
body
;
return
this
;
}
@Override
public
KeyValue
headers
()
{
return
headers
;
}
@Override
public
KeyValue
properties
()
{
return
properties
;
}
@Override
public
Message
putHeaders
(
final
String
key
,
final
int
value
)
{
headers
.
put
(
key
,
value
);
return
this
;
}
@Override
public
Message
putHeaders
(
final
String
key
,
final
long
value
)
{
headers
.
put
(
key
,
value
);
return
this
;
}
@Override
public
Message
putHeaders
(
final
String
key
,
final
double
value
)
{
headers
.
put
(
key
,
value
);
return
this
;
}
@Override
public
Message
putHeaders
(
final
String
key
,
final
String
value
)
{
headers
.
put
(
key
,
value
);
return
this
;
}
@Override
public
Message
putProperties
(
final
String
key
,
final
int
value
)
{
properties
.
put
(
key
,
value
);
return
this
;
}
@Override
public
Message
putProperties
(
final
String
key
,
final
long
value
)
{
properties
.
put
(
key
,
value
);
return
this
;
}
@Override
public
Message
putProperties
(
final
String
key
,
final
double
value
)
{
properties
.
put
(
key
,
value
);
return
this
;
}
@Override
public
Message
putProperties
(
final
String
key
,
final
String
value
)
{
properties
.
put
(
key
,
value
);
return
this
;
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
0 → 100644
浏览文件 @
ce146934
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
io.openmessaging.rocketmq.domain
;
public
class
NonStandardKeys
{
}
openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
0 → 100644
浏览文件 @
ce146934
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
io.openmessaging.rocketmq.domain
;
import
io.openmessaging.KeyValue
;
import
io.openmessaging.SendResult
;
public
class
SendResultImpl
implements
SendResult
{
private
String
messageId
;
private
KeyValue
properties
;
public
SendResultImpl
(
final
String
messageId
,
final
KeyValue
properties
)
{
this
.
messageId
=
messageId
;
this
.
properties
=
properties
;
}
@Override
public
String
messageId
()
{
return
messageId
;
}
@Override
public
KeyValue
properties
()
{
return
properties
;
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
0 → 100644
浏览文件 @
ce146934
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
io.openmessaging.rocketmq.producer
;
import
io.openmessaging.BytesMessage
;
import
io.openmessaging.KeyValue
;
import
io.openmessaging.Message
;
import
io.openmessaging.MessageFactory
;
import
io.openmessaging.MessageHeader
;
import
io.openmessaging.PropertyKeys
;
import
io.openmessaging.ServiceLifecycle
;
import
io.openmessaging.exception.OMSMessageFormatException
;
import
io.openmessaging.exception.OMSNotSupportedException
;
import
io.openmessaging.exception.OMSRuntimeException
;
import
io.openmessaging.exception.OMSTimeOutException
;
import
io.openmessaging.rocketmq.domain.BytesMessageImpl
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.remoting.exception.RemotingConnectException
;
import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException
;
import
org.slf4j.Logger
;
import
static
io
.
openmessaging
.
rocketmq
.
OMSUtil
.
buildInstanceName
;
abstract
class
AbstractOMSProducer
implements
ServiceLifecycle
,
MessageFactory
{
final
static
Logger
log
=
ClientLogger
.
getLog
();
final
KeyValue
properties
;
final
DefaultMQProducer
rocketmqProducer
;
private
boolean
started
=
false
;
AbstractOMSProducer
(
final
KeyValue
properties
)
{
this
.
properties
=
properties
;
this
.
rocketmqProducer
=
new
DefaultMQProducer
();
String
accessPoints
=
properties
.
getString
(
PropertyKeys
.
ACCESS_POINTS
);
if
(
accessPoints
==
null
||
accessPoints
.
isEmpty
())
{
throw
new
OMSRuntimeException
(
"-1"
,
"OMS AccessPoints is null or empty."
);
}
String
producerId
=
buildInstanceName
();
int
operationTimeout
=
properties
.
getInt
(
PropertyKeys
.
OPERATION_TIMEOUT
);
this
.
rocketmqProducer
.
setProducerGroup
(
producerId
);
this
.
rocketmqProducer
.
setSendMsgTimeout
(
operationTimeout
==
0
?
5000
:
operationTimeout
);
this
.
rocketmqProducer
.
setInstanceName
(
producerId
);
this
.
rocketmqProducer
.
setNamesrvAddr
(
accessPoints
.
replace
(
','
,
';'
));
this
.
rocketmqProducer
.
setMaxMessageSize
(
1024
*
1024
*
4
);
properties
.
put
(
PropertyKeys
.
PRODUCER_ID
,
producerId
);
}
@Override
public
synchronized
void
startup
()
{
if
(!
started
)
{
try
{
this
.
rocketmqProducer
.
start
();
}
catch
(
MQClientException
e
)
{
throw
new
OMSRuntimeException
(
"-1"
,
e
);
}
}
this
.
started
=
true
;
}
@Override
public
synchronized
void
shutdown
()
{
if
(
this
.
started
)
{
this
.
rocketmqProducer
.
shutdown
();
}
this
.
started
=
false
;
}
OMSRuntimeException
checkProducerException
(
String
topic
,
String
msgId
,
Throwable
e
)
{
if
(
e
instanceof
MQClientException
)
{
if
(
e
.
getCause
()
!=
null
)
{
if
(
e
.
getCause
()
instanceof
RemotingTimeoutException
)
{
return
new
OMSTimeOutException
(
"-1"
,
String
.
format
(
"Send message to broker timeout, %dms, Topic=%s, msgId=%s"
,
this
.
rocketmqProducer
.
getSendMsgTimeout
(),
topic
,
msgId
),
e
);
}
else
if
(
e
.
getCause
()
instanceof
MQBrokerException
||
e
.
getCause
()
instanceof
RemotingConnectException
)
{
MQBrokerException
brokerException
=
(
MQBrokerException
)
e
.
getCause
();
return
new
OMSRuntimeException
(
"-1"
,
String
.
format
(
"Received a broker exception, Topic=%s, msgId=%s, %s"
,
topic
,
msgId
,
brokerException
.
getErrorMessage
()),
e
);
}
}
// Exception thrown by local.
else
{
MQClientException
clientException
=
(
MQClientException
)
e
;
if
(-
1
==
clientException
.
getResponseCode
())
{
return
new
OMSRuntimeException
(
"-1"
,
String
.
format
(
"Topic does not exist, Topic=%s, msgId=%s"
,
topic
,
msgId
),
e
);
}
else
if
(
ResponseCode
.
MESSAGE_ILLEGAL
==
clientException
.
getResponseCode
())
{
return
new
OMSMessageFormatException
(
"-1"
,
String
.
format
(
"A illegal message for RocketMQ, Topic=%s, msgId=%s"
,
topic
,
msgId
),
e
);
}
}
}
return
new
OMSRuntimeException
(
"-1"
,
"Send message to RocketMQ broker failed."
,
e
);
}
protected
void
checkMessageType
(
Message
message
)
{
if
(!(
message
instanceof
BytesMessage
))
{
throw
new
OMSNotSupportedException
(
"-1"
,
"Only BytesMessage is supported."
);
}
}
@Override
public
BytesMessage
createBytesMessageToTopic
(
final
String
topic
,
final
byte
[]
body
)
{
BytesMessage
bytesMessage
=
new
BytesMessageImpl
();
bytesMessage
.
setBody
(
body
);
bytesMessage
.
headers
().
put
(
MessageHeader
.
TOPIC
,
topic
);
return
bytesMessage
;
}
@Override
public
BytesMessage
createBytesMessageToQueue
(
final
String
queue
,
final
byte
[]
body
)
{
BytesMessage
bytesMessage
=
new
BytesMessageImpl
();
bytesMessage
.
setBody
(
body
);
bytesMessage
.
headers
().
put
(
MessageHeader
.
QUEUE
,
queue
);
return
bytesMessage
;
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
0 → 100644
浏览文件 @
ce146934
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
io.openmessaging.rocketmq.producer
;
import
io.openmessaging.BytesMessage
;
import
io.openmessaging.KeyValue
;
import
io.openmessaging.Message
;
import
io.openmessaging.MessageHeader
;
import
io.openmessaging.Producer
;
import
io.openmessaging.Promise
;
import
io.openmessaging.PropertyKeys
;
import
io.openmessaging.SendResult
;
import
io.openmessaging.exception.OMSRuntimeException
;
import
io.openmessaging.rocketmq.OMSUtil
;
import
io.openmessaging.rocketmq.promise.DefaultPromise
;
import
org.apache.rocketmq.client.producer.SendCallback
;
import
org.apache.rocketmq.client.producer.SendStatus
;
import
static
io
.
openmessaging
.
rocketmq
.
OMSUtil
.
msgConvert
;
public
class
ProducerImpl
extends
AbstractOMSProducer
implements
Producer
{
public
ProducerImpl
(
final
KeyValue
properties
)
{
super
(
properties
);
}
@Override
public
KeyValue
properties
()
{
return
properties
;
}
@Override
public
SendResult
send
(
final
Message
message
)
{
return
send
(
message
,
this
.
rocketmqProducer
.
getSendMsgTimeout
());
}
@Override
public
SendResult
send
(
final
Message
message
,
final
KeyValue
properties
)
{
long
timeout
=
properties
.
containsKey
(
PropertyKeys
.
OPERATION_TIMEOUT
)
?
properties
.
getInt
(
PropertyKeys
.
OPERATION_TIMEOUT
)
:
this
.
rocketmqProducer
.
getSendMsgTimeout
();
return
send
(
message
,
timeout
);
}
private
SendResult
send
(
final
Message
message
,
long
timeout
)
{
checkMessageType
(
message
);
org
.
apache
.
rocketmq
.
common
.
message
.
Message
rmqMessage
=
msgConvert
((
BytesMessage
)
message
);
try
{
org
.
apache
.
rocketmq
.
client
.
producer
.
SendResult
rmqResult
=
this
.
rocketmqProducer
.
send
(
rmqMessage
,
timeout
);
if
(!
rmqResult
.
getSendStatus
().
equals
(
SendStatus
.
SEND_OK
))
{
log
.
error
(
String
.
format
(
"Send message to RocketMQ failed, %s"
,
message
));
throw
new
OMSRuntimeException
(
"-1"
,
"Send message to RocketMQ failed."
);
}
message
.
headers
().
put
(
MessageHeader
.
MESSAGE_ID
,
rmqResult
.
getMsgId
());
return
OMSUtil
.
sendResultConvert
(
rmqResult
);
}
catch
(
Exception
e
)
{
log
.
error
(
String
.
format
(
"Send message to RocketMQ failed, %s"
,
message
),
e
);
throw
checkProducerException
(
rmqMessage
.
getTopic
(),
message
.
headers
().
getString
(
MessageHeader
.
MESSAGE_ID
),
e
);
}
}
@Override
public
Promise
<
SendResult
>
sendAsync
(
final
Message
message
)
{
return
sendAsync
(
message
,
this
.
rocketmqProducer
.
getSendMsgTimeout
());
}
@Override
public
Promise
<
SendResult
>
sendAsync
(
final
Message
message
,
final
KeyValue
properties
)
{
long
timeout
=
properties
.
containsKey
(
PropertyKeys
.
OPERATION_TIMEOUT
)
?
properties
.
getInt
(
PropertyKeys
.
OPERATION_TIMEOUT
)
:
this
.
rocketmqProducer
.
getSendMsgTimeout
();
return
sendAsync
(
message
,
timeout
);
}
private
Promise
<
SendResult
>
sendAsync
(
final
Message
message
,
long
timeout
)
{
checkMessageType
(
message
);
org
.
apache
.
rocketmq
.
common
.
message
.
Message
rmqMessage
=
msgConvert
((
BytesMessage
)
message
);
final
Promise
<
SendResult
>
promise
=
new
DefaultPromise
<>();
try
{
this
.
rocketmqProducer
.
send
(
rmqMessage
,
new
SendCallback
()
{
@Override
public
void
onSuccess
(
final
org
.
apache
.
rocketmq
.
client
.
producer
.
SendResult
rmqResult
)
{
message
.
headers
().
put
(
MessageHeader
.
MESSAGE_ID
,
rmqResult
.
getMsgId
());
promise
.
set
(
OMSUtil
.
sendResultConvert
(
rmqResult
));
}
@Override
public
void
onException
(
final
Throwable
e
)
{
promise
.
setFailure
(
e
);
}
},
timeout
);
}
catch
(
Exception
e
)
{
promise
.
setFailure
(
e
);
}
return
promise
;
}
@Override
public
void
sendOneway
(
final
Message
message
)
{
checkMessageType
(
message
);
org
.
apache
.
rocketmq
.
common
.
message
.
Message
rmqMessage
=
msgConvert
((
BytesMessage
)
message
);
try
{
this
.
rocketmqProducer
.
sendOneway
(
rmqMessage
);
}
catch
(
Exception
ignore
)
{
//Ignore the oneway exception.
}
}
@Override
public
void
sendOneway
(
final
Message
message
,
final
KeyValue
properties
)
{
sendOneway
(
message
);
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
0 → 100644
浏览文件 @
ce146934
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
io.openmessaging.rocketmq.producer
;
import
io.openmessaging.BytesMessage
;
import
io.openmessaging.KeyValue
;
import
io.openmessaging.Message
;
import
io.openmessaging.MessageHeader
;
import
io.openmessaging.SequenceProducer
;
import
io.openmessaging.rocketmq.OMSUtil
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
org.apache.rocketmq.client.Validators
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.producer.SendResult
;
public
class
SequenceProducerImpl
extends
AbstractOMSProducer
implements
SequenceProducer
{
private
BlockingQueue
<
Message
>
msgCacheQueue
;
public
SequenceProducerImpl
(
final
KeyValue
properties
)
{
super
(
properties
);
this
.
msgCacheQueue
=
new
LinkedBlockingQueue
<>();
}
@Override
public
KeyValue
properties
()
{
return
properties
;
}
@Override
public
void
send
(
final
Message
message
)
{
checkMessageType
(
message
);
org
.
apache
.
rocketmq
.
common
.
message
.
Message
rmqMessage
=
OMSUtil
.
msgConvert
((
BytesMessage
)
message
);
try
{
Validators
.
checkMessage
(
rmqMessage
,
this
.
rocketmqProducer
);
}
catch
(
MQClientException
e
)
{
throw
checkProducerException
(
rmqMessage
.
getTopic
(),
message
.
headers
().
getString
(
MessageHeader
.
MESSAGE_ID
),
e
);
}
msgCacheQueue
.
add
(
message
);
}
@Override
public
void
send
(
final
Message
message
,
final
KeyValue
properties
)
{
send
(
message
);
}
@Override
public
synchronized
void
commit
()
{
List
<
Message
>
messages
=
new
ArrayList
<>();
msgCacheQueue
.
drainTo
(
messages
);
List
<
org
.
apache
.
rocketmq
.
common
.
message
.
Message
>
rmqMessages
=
new
ArrayList
<>();
for
(
Message
message
:
messages
)
{
rmqMessages
.
add
(
OMSUtil
.
msgConvert
((
BytesMessage
)
message
));
}
try
{
SendResult
sendResult
=
this
.
rocketmqProducer
.
send
(
rmqMessages
);
String
[]
msgIdArray
=
sendResult
.
getMsgId
().
split
(
","
);
for
(
int
i
=
0
;
i
<
messages
.
size
();
i
++)
{
Message
message
=
messages
.
get
(
i
);
message
.
headers
().
put
(
MessageHeader
.
MESSAGE_ID
,
msgIdArray
[
i
]);
}
}
catch
(
Exception
e
)
{
throw
checkProducerException
(
""
,
""
,
e
);
}
}
@Override
public
synchronized
void
rollback
()
{
msgCacheQueue
.
clear
();
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
0 → 100644
浏览文件 @
ce146934
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
io.openmessaging.rocketmq.promise
;
import
io.openmessaging.Promise
;
import
io.openmessaging.PromiseListener
;
import
io.openmessaging.exception.OMSRuntimeException
;
import
java.util.ArrayList
;
import
java.util.List
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
DefaultPromise
<
V
>
implements
Promise
<
V
>
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
DefaultPromise
.
class
);
private
final
Object
lock
=
new
Object
();
private
volatile
FutureState
state
=
FutureState
.
DOING
;
private
V
result
=
null
;
private
long
timeout
;
private
long
createTime
;
private
Throwable
exception
=
null
;
private
List
<
PromiseListener
>
promiseListenerList
;
public
DefaultPromise
()
{
createTime
=
System
.
currentTimeMillis
();
promiseListenerList
=
new
ArrayList
<>();
timeout
=
5000
;
}
@Override
public
boolean
cancel
(
final
boolean
mayInterruptIfRunning
)
{
return
false
;
}
@Override
public
boolean
isCancelled
()
{
return
state
.
isCancelledState
();
}
@Override
public
boolean
isDone
()
{
return
state
.
isDoneState
();
}
@Override
public
V
get
()
{
return
result
;
}
@Override
public
V
get
(
final
long
timeout
)
{
synchronized
(
lock
)
{
if
(!
isDoing
())
{
return
getValueOrThrowable
();
}
if
(
timeout
<=
0
)
{
try
{
lock
.
wait
();
}
catch
(
Exception
e
)
{
cancel
(
e
);
}
return
getValueOrThrowable
();
}
else
{
long
waitTime
=
timeout
-
(
System
.
currentTimeMillis
()
-
createTime
);
if
(
waitTime
>
0
)
{
for
(;
;
)
{
try
{
lock
.
wait
(
waitTime
);
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"promise get value interrupted,excepiton:{}"
,
e
.
getMessage
());
}
if
(!
isDoing
())
{
break
;
}
else
{
waitTime
=
timeout
-
(
System
.
currentTimeMillis
()
-
createTime
);
if
(
waitTime
<=
0
)
{
break
;
}
}
}
}
if
(
isDoing
())
{
timeoutSoCancel
();
}
}
return
getValueOrThrowable
();
}
}
@Override
public
boolean
set
(
final
V
value
)
{
if
(
value
==
null
)
return
false
;
this
.
result
=
value
;
return
done
();
}
@Override
public
boolean
setFailure
(
final
Throwable
cause
)
{
if
(
cause
==
null
)
return
false
;
this
.
exception
=
cause
;
return
done
();
}
@Override
public
void
addListener
(
final
PromiseListener
listener
)
{
if
(
listener
==
null
)
{
throw
new
NullPointerException
(
"FutureListener is null"
);
}
boolean
notifyNow
=
false
;
synchronized
(
lock
)
{
if
(!
isDoing
())
{
notifyNow
=
true
;
}
else
{
if
(
promiseListenerList
==
null
)
{
promiseListenerList
=
new
ArrayList
<>();
}
promiseListenerList
.
add
(
listener
);
}
}
if
(
notifyNow
)
{
notifyListener
(
listener
);
}
}
@Override
public
Throwable
getThrowable
()
{
return
exception
;
}
private
void
notifyListeners
()
{
if
(
promiseListenerList
!=
null
)
{
for
(
PromiseListener
listener
:
promiseListenerList
)
{
notifyListener
(
listener
);
}
}
}
private
boolean
isSuccess
()
{
return
isDone
()
&&
(
exception
==
null
);
}
private
void
timeoutSoCancel
()
{
synchronized
(
lock
)
{
if
(!
isDoing
())
{
return
;
}
state
=
FutureState
.
CANCELLED
;
exception
=
new
RuntimeException
(
"get request result is timeout or interrupted"
);
lock
.
notifyAll
();
}
notifyListeners
();
}
private
V
getValueOrThrowable
()
{
if
(
exception
!=
null
)
{
Throwable
e
=
exception
.
getCause
()
!=
null
?
exception
.
getCause
()
:
exception
;
throw
new
OMSRuntimeException
(
"-1"
,
e
);
}
notifyListeners
();
return
result
;
}
private
boolean
isDoing
()
{
return
state
.
isDoingState
();
}
private
boolean
done
()
{
synchronized
(
lock
)
{
if
(!
isDoing
())
{
return
false
;
}
state
=
FutureState
.
DONE
;
lock
.
notifyAll
();
}
notifyListeners
();
return
true
;
}
private
void
notifyListener
(
final
PromiseListener
<
V
>
listener
)
{
try
{
if
(
exception
!=
null
)
listener
.
operationFailed
(
this
);
else
listener
.
operationCompleted
(
this
);
}
catch
(
Throwable
t
)
{
LOG
.
error
(
"notifyListener {} Error:{}"
,
listener
.
getClass
().
getSimpleName
(),
t
);
}
}
private
boolean
cancel
(
Exception
e
)
{
synchronized
(
lock
)
{
if
(!
isDoing
())
{
return
false
;
}
state
=
FutureState
.
CANCELLED
;
exception
=
e
;
lock
.
notifyAll
();
}
notifyListeners
();
return
true
;
}
}
openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
0 → 100644
浏览文件 @
ce146934
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
io.openmessaging.rocketmq.promise
;
public
enum
FutureState
{
/** the task is doing **/
DOING
(
0
),
/** the task is done **/
DONE
(
1
),
/** ths task is cancelled **/
CANCELLED
(
2
);
public
final
int
value
;
private
FutureState
(
int
value
)
{
this
.
value
=
value
;
}
public
boolean
isCancelledState
()
{
return
this
==
CANCELLED
;
}
public
boolean
isDoneState
()
{
return
this
==
DONE
;
}
public
boolean
isDoingState
()
{
return
this
==
DOING
;
}
}
pom.xml
浏览文件 @
ce146934
...
...
@@ -606,8 +606,8 @@
</dependency>
<dependency>
<groupId>
io.openmessaging
</groupId>
<artifactId>
messaging-user-level
-api
</artifactId>
<version>
1.0.0-SNAPSHOT
</version>
<artifactId>
openmessaging
-api
</artifactId>
<version>
0.1.0-beta
</version>
</dependency>
</dependencies>
</dependencyManagement>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录