From 76da1b07ecebce2097f66cec7bc5b05b8914fb1e Mon Sep 17 00:00:00 2001 From: peng-yongsheng <8082209@qq.com> Date: Fri, 3 Nov 2017 17:02:40 +0800 Subject: [PATCH] Provide queue module with disruptor and data carrier providers. --- apm-collector/apm-collector-boot/pom.xml | 12 +++ apm-collector/apm-collector-component/pom.xml | 1 - .../pom.xml | 7 ++ .../QueueModuleDataCarrierProvider.java | 57 ++++++++++++++ .../DataCarrierQueueCreatorService.java | 33 ++++++++ ...g.apm.collector.core.module.ModuleProvider | 19 +++++ .../apm/collector/queue/QueueModule.java | 38 ++++++++++ .../queue/base/DaemonThreadFactory.java | 35 +++++++++ .../queue/base/EndOfBatchCommand.java | 25 ++++++ .../collector/queue/base/MessageHolder.java | 38 ++++++++++ .../collector/queue/base/QueueCreator.java | 26 +++++++ .../queue/base/QueueEventHandler.java | 26 +++++++ .../collector/queue/base/QueueExecutor.java | 26 +++++++ .../queue/service/QueueCreatorService.java | 30 ++++++++ ...kywalking.apm.collector.core.module.Module | 19 +++++ .../pom.xml | 12 +++ .../QueueModuleDisruptorProvider.java | 57 ++++++++++++++ .../disruptor/base/DisruptorEventHandler.java | 76 +++++++++++++++++++ .../disruptor/base/DisruptorQueueCreator.java | 53 +++++++++++++ .../disruptor/base/MessageHolderFactory.java | 34 +++++++++ .../service/DisruptorQueueCreatorService.java | 32 ++++++++ ...g.apm.collector.core.module.ModuleProvider | 19 +++++ apm-collector/apm-collector-queue/pom.xml | 8 +- 23 files changed, 681 insertions(+), 2 deletions(-) create mode 100644 apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueModuleDataCarrierProvider.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/service/DataCarrierQueueCreatorService.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider create mode 100644 apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/QueueModule.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/DaemonThreadFactory.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/EndOfBatchCommand.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/MessageHolder.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueCreator.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueEventHandler.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/service/QueueCreatorService.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-define/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.Module create mode 100644 apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/QueueModuleDisruptorProvider.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorQueueCreator.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/MessageHolderFactory.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/service/DisruptorQueueCreatorService.java create mode 100644 apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider diff --git a/apm-collector/apm-collector-boot/pom.xml b/apm-collector/apm-collector-boot/pom.xml index 8dc67d1d87..69f7de2dc7 100644 --- a/apm-collector/apm-collector-boot/pom.xml +++ b/apm-collector/apm-collector-boot/pom.xml @@ -117,5 +117,17 @@ ${project.version} + + + org.skywalking + collector-queue-datacarrier-provider + ${project.version} + + + org.skywalking + collector-queue-disruptor-provider + ${project.version} + + \ No newline at end of file diff --git a/apm-collector/apm-collector-component/pom.xml b/apm-collector/apm-collector-component/pom.xml index 2bf606fb38..31317c6447 100644 --- a/apm-collector/apm-collector-component/pom.xml +++ b/apm-collector/apm-collector-component/pom.xml @@ -14,7 +14,6 @@ client-component server-component - queue-component diff --git a/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/pom.xml b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/pom.xml index 6436a96c43..fcb89d4d59 100644 --- a/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/pom.xml +++ b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/pom.xml @@ -30,4 +30,11 @@ collector-queue-datacarrier-provider jar + + + org.skywalking + collector-queue-define + ${project.version} + + \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueModuleDataCarrierProvider.java b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueModuleDataCarrierProvider.java new file mode 100644 index 0000000000..5e63afbee0 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueModuleDataCarrierProvider.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.datacarrier; + +import java.util.Properties; +import org.skywalking.apm.collector.core.module.Module; +import org.skywalking.apm.collector.core.module.ModuleProvider; +import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; +import org.skywalking.apm.collector.queue.QueueModule; +import org.skywalking.apm.collector.queue.datacarrier.service.DataCarrierQueueCreatorService; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; + +/** + * @author peng-yongsheng + */ +public class QueueModuleDataCarrierProvider extends ModuleProvider { + + @Override public String name() { + return "datacarrier"; + } + + @Override public Class module() { + return QueueModule.class; + } + + @Override public void prepare(Properties config) throws ServiceNotProvidedException { + this.registerServiceImplementation(QueueCreatorService.class, new DataCarrierQueueCreatorService()); + } + + @Override public void start(Properties config) throws ServiceNotProvidedException { + + } + + @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { + + } + + @Override public String[] requiredModules() { + return new String[0]; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/service/DataCarrierQueueCreatorService.java b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/service/DataCarrierQueueCreatorService.java new file mode 100644 index 0000000000..c6e11480f5 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/service/DataCarrierQueueCreatorService.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.datacarrier.service; + +import org.skywalking.apm.collector.queue.base.QueueEventHandler; +import org.skywalking.apm.collector.queue.base.QueueExecutor; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; + +/** + * @author peng-yongsheng + */ +public class DataCarrierQueueCreatorService implements QueueCreatorService { + + @Override public QueueEventHandler create(int queueSize, QueueExecutor executor) { + return null; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider new file mode 100644 index 0000000000..8ae1fd21d4 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Copyright 2017, OpenSkywalking Organization All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Project repository: https://github.com/OpenSkywalking/skywalking +# + +org.skywalking.apm.collector.queue.datacarrier.QueueModuleDataCarrierProvider \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/QueueModule.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/QueueModule.java new file mode 100644 index 0000000000..bab6b027c1 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/QueueModule.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue; + +import org.skywalking.apm.collector.core.module.Module; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; + +/** + * @author peng-yongsheng + */ +public class QueueModule extends Module { + + public static final String NAME = "queue"; + + @Override public String name() { + return NAME; + } + + @Override public Class[] services() { + return new Class[] {QueueCreatorService.class}; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/DaemonThreadFactory.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/DaemonThreadFactory.java new file mode 100644 index 0000000000..08d79b53ec --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/DaemonThreadFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +import java.util.concurrent.ThreadFactory; + +/** + * @author peng-yongsheng + */ +public enum DaemonThreadFactory implements ThreadFactory { + INSTANCE; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setDaemon(true); + return t; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/EndOfBatchCommand.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/EndOfBatchCommand.java new file mode 100644 index 0000000000..1b67f43ffc --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/EndOfBatchCommand.java @@ -0,0 +1,25 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +/** + * @author peng-yongsheng + */ +public class EndOfBatchCommand { +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/MessageHolder.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/MessageHolder.java new file mode 100644 index 0000000000..00d8b4797c --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/MessageHolder.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +/** + * @author peng-yongsheng + */ +public class MessageHolder { + private Object message; + + public Object getMessage() { + return message; + } + + public void setMessage(Object message) { + this.message = message; + } + + public void reset() { + message = null; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueCreator.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueCreator.java new file mode 100644 index 0000000000..7527e8aa5c --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueCreator.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +/** + * @author peng-yongsheng + */ +public interface QueueCreator { + QueueEventHandler create(int queueSize, QueueExecutor executor); +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueEventHandler.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueEventHandler.java new file mode 100644 index 0000000000..7467dbc2ff --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueEventHandler.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +/** + * @author peng-yongsheng + */ +public interface QueueEventHandler { + void tell(Object message); +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java new file mode 100644 index 0000000000..eabf2c415a --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +/** + * @author peng-yongsheng + */ +public interface QueueExecutor { + void execute(Object message); +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/service/QueueCreatorService.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/service/QueueCreatorService.java new file mode 100644 index 0000000000..8860878d32 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/service/QueueCreatorService.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.service; + +import org.skywalking.apm.collector.core.module.Service; +import org.skywalking.apm.collector.queue.base.QueueEventHandler; +import org.skywalking.apm.collector.queue.base.QueueExecutor; + +/** + * @author peng-yongsheng + */ +public interface QueueCreatorService extends Service { + QueueEventHandler create(int queueSize, QueueExecutor executor); +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.Module b/apm-collector/apm-collector-queue/collector-queue-define/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.Module new file mode 100644 index 0000000000..8e26eed9c8 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.Module @@ -0,0 +1,19 @@ +# +# Copyright 2017, OpenSkywalking Organization All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Project repository: https://github.com/OpenSkywalking/skywalking +# + +org.skywalking.apm.collector.queue.QueueModule \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/pom.xml b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/pom.xml index dd2224077c..eac868d49b 100644 --- a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/pom.xml +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/pom.xml @@ -30,4 +30,16 @@ collector-queue-disruptor-provider jar + + + org.skywalking + collector-queue-define + ${project.version} + + + com.lmax + disruptor + 3.3.6 + + \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/QueueModuleDisruptorProvider.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/QueueModuleDisruptorProvider.java new file mode 100644 index 0000000000..a5c85ebc85 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/QueueModuleDisruptorProvider.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.disruptor; + +import java.util.Properties; +import org.skywalking.apm.collector.core.module.Module; +import org.skywalking.apm.collector.core.module.ModuleProvider; +import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; +import org.skywalking.apm.collector.queue.QueueModule; +import org.skywalking.apm.collector.queue.disruptor.service.DisruptorQueueCreatorService; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; + +/** + * @author peng-yongsheng + */ +public class QueueModuleDisruptorProvider extends ModuleProvider { + + @Override public String name() { + return "disruptor"; + } + + @Override public Class module() { + return QueueModule.class; + } + + @Override public void prepare(Properties config) throws ServiceNotProvidedException { + this.registerServiceImplementation(QueueCreatorService.class, new DisruptorQueueCreatorService()); + } + + @Override public void start(Properties config) throws ServiceNotProvidedException { + + } + + @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { + + } + + @Override public String[] requiredModules() { + return new String[0]; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java new file mode 100644 index 0000000000..447c3bc2db --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java @@ -0,0 +1,76 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.disruptor.base; + +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; +import org.skywalking.apm.collector.queue.base.EndOfBatchCommand; +import org.skywalking.apm.collector.queue.base.MessageHolder; +import org.skywalking.apm.collector.queue.base.QueueEventHandler; +import org.skywalking.apm.collector.queue.base.QueueExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class DisruptorEventHandler implements EventHandler, QueueEventHandler { + + private final Logger logger = LoggerFactory.getLogger(DisruptorEventHandler.class); + + private RingBuffer ringBuffer; + private QueueExecutor executor; + + DisruptorEventHandler(RingBuffer ringBuffer, QueueExecutor executor) { + this.ringBuffer = ringBuffer; + this.executor = executor; + } + + /** + * Receive the message from disruptor, when message in disruptor is empty, then send the cached data + * to the next workers. + * + * @param event published to the {@link RingBuffer} + * @param sequence of the event being processed + * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer} + */ + public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) { + Object message = event.getMessage(); + event.reset(); + + executor.execute(message); + if (endOfBatch) { + executor.execute(new EndOfBatchCommand()); + } + } + + /** + * Push the message into disruptor ring buffer. + * + * @param message of the data to process. + */ + public void tell(Object message) { + long sequence = ringBuffer.next(); + try { + ringBuffer.get(sequence).setMessage(message); + } finally { + ringBuffer.publish(sequence); + } + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorQueueCreator.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorQueueCreator.java new file mode 100644 index 0000000000..f2834709ed --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorQueueCreator.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.disruptor.base; + +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import org.skywalking.apm.collector.queue.base.DaemonThreadFactory; +import org.skywalking.apm.collector.queue.base.MessageHolder; +import org.skywalking.apm.collector.queue.base.QueueCreator; +import org.skywalking.apm.collector.queue.base.QueueEventHandler; +import org.skywalking.apm.collector.queue.base.QueueExecutor; + +/** + * @author peng-yongsheng + */ +public class DisruptorQueueCreator implements QueueCreator { + + @Override public QueueEventHandler create(int queueSize, QueueExecutor executor) { + // Specify the size of the ring buffer, must be power of 2. + if (!((((queueSize - 1) & queueSize) == 0) && queueSize != 0)) { + throw new IllegalArgumentException("queue size must be power of 2"); + } + + // Construct the Disruptor + Disruptor disruptor = new Disruptor(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE); + + RingBuffer ringBuffer = disruptor.getRingBuffer(); + DisruptorEventHandler eventHandler = new DisruptorEventHandler(ringBuffer, executor); + + // Connect the handler + disruptor.handleEventsWith(eventHandler); + + // Start the Disruptor, starts all threads running + disruptor.start(); + return eventHandler; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/MessageHolderFactory.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/MessageHolderFactory.java new file mode 100644 index 0000000000..9d5e1a54d7 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/MessageHolderFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.disruptor.base; + +import com.lmax.disruptor.EventFactory; +import org.skywalking.apm.collector.queue.base.MessageHolder; + +/** + * @author peng-yongsheng + */ +public class MessageHolderFactory implements EventFactory { + + public static MessageHolderFactory INSTANCE = new MessageHolderFactory(); + + public MessageHolder newInstance() { + return new MessageHolder(); + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/service/DisruptorQueueCreatorService.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/service/DisruptorQueueCreatorService.java new file mode 100644 index 0000000000..52021f5c50 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/service/DisruptorQueueCreatorService.java @@ -0,0 +1,32 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.disruptor.service; + +import org.skywalking.apm.collector.queue.base.QueueEventHandler; +import org.skywalking.apm.collector.queue.base.QueueExecutor; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; + +/** + * @author peng-yongsheng + */ +public class DisruptorQueueCreatorService implements QueueCreatorService { + @Override public QueueEventHandler create(int queueSize, QueueExecutor executor) { + return null; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider new file mode 100644 index 0000000000..080bba992c --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Copyright 2017, OpenSkywalking Organization All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Project repository: https://github.com/OpenSkywalking/skywalking +# + +org.skywalking.apm.collector.queue.disruptor.QueueModuleDisruptorProvider \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/pom.xml b/apm-collector/apm-collector-queue/pom.xml index 5735adc520..9f80371ef7 100644 --- a/apm-collector/apm-collector-queue/pom.xml +++ b/apm-collector/apm-collector-queue/pom.xml @@ -35,5 +35,11 @@ collector-queue-disruptor-provider - + + + org.skywalking + apm-collector-core + ${project.version} + + \ No newline at end of file -- GitLab