diff --git a/apm-collector/apm-collector-boot/pom.xml b/apm-collector/apm-collector-boot/pom.xml index 8dc67d1d876296f4f1ed7f5f4fc03b289cac3ec9..69f7de2dc77373bd5e0bd1f12692aab54b61e4c0 100644 --- a/apm-collector/apm-collector-boot/pom.xml +++ b/apm-collector/apm-collector-boot/pom.xml @@ -117,5 +117,17 @@ ${project.version} + + + org.skywalking + collector-queue-datacarrier-provider + ${project.version} + + + org.skywalking + collector-queue-disruptor-provider + ${project.version} + + \ No newline at end of file diff --git a/apm-collector/apm-collector-component/pom.xml b/apm-collector/apm-collector-component/pom.xml index 2bf606fb384f15bc9707a43b2b48fb1bdbf46b71..31317c6447371af1994a9d309885f9a57e3359af 100644 --- a/apm-collector/apm-collector-component/pom.xml +++ b/apm-collector/apm-collector-component/pom.xml @@ -14,7 +14,6 @@ client-component server-component - queue-component diff --git a/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/pom.xml b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/pom.xml index 6436a96c433f328fdf339e226f89c9f500186b50..fcb89d4d59db2ffb58df9a00fce4078c2a12d4bd 100644 --- a/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/pom.xml +++ b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/pom.xml @@ -30,4 +30,11 @@ collector-queue-datacarrier-provider jar + + + org.skywalking + collector-queue-define + ${project.version} + + \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueModuleDataCarrierProvider.java b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueModuleDataCarrierProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..5e63afbee0e914ac0c7780dcd4e0e163a1eb18c2 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueModuleDataCarrierProvider.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.datacarrier; + +import java.util.Properties; +import org.skywalking.apm.collector.core.module.Module; +import org.skywalking.apm.collector.core.module.ModuleProvider; +import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; +import org.skywalking.apm.collector.queue.QueueModule; +import org.skywalking.apm.collector.queue.datacarrier.service.DataCarrierQueueCreatorService; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; + +/** + * @author peng-yongsheng + */ +public class QueueModuleDataCarrierProvider extends ModuleProvider { + + @Override public String name() { + return "datacarrier"; + } + + @Override public Class module() { + return QueueModule.class; + } + + @Override public void prepare(Properties config) throws ServiceNotProvidedException { + this.registerServiceImplementation(QueueCreatorService.class, new DataCarrierQueueCreatorService()); + } + + @Override public void start(Properties config) throws ServiceNotProvidedException { + + } + + @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { + + } + + @Override public String[] requiredModules() { + return new String[0]; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/service/DataCarrierQueueCreatorService.java b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/service/DataCarrierQueueCreatorService.java new file mode 100644 index 0000000000000000000000000000000000000000..c6e11480f5136da0cbedb888df35d9a2f8e158fd --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/java/org/skywalking/apm/collector/queue/datacarrier/service/DataCarrierQueueCreatorService.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.datacarrier.service; + +import org.skywalking.apm.collector.queue.base.QueueEventHandler; +import org.skywalking.apm.collector.queue.base.QueueExecutor; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; + +/** + * @author peng-yongsheng + */ +public class DataCarrierQueueCreatorService implements QueueCreatorService { + + @Override public QueueEventHandler create(int queueSize, QueueExecutor executor) { + return null; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider new file mode 100644 index 0000000000000000000000000000000000000000..8ae1fd21d4dbd782928c38f261aba39a1aea67d7 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-datacarrier-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Copyright 2017, OpenSkywalking Organization All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Project repository: https://github.com/OpenSkywalking/skywalking +# + +org.skywalking.apm.collector.queue.datacarrier.QueueModuleDataCarrierProvider \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/QueueModule.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/QueueModule.java new file mode 100644 index 0000000000000000000000000000000000000000..bab6b027c100e007ac9e37d05668fde6b1d3ba9a --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/QueueModule.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue; + +import org.skywalking.apm.collector.core.module.Module; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; + +/** + * @author peng-yongsheng + */ +public class QueueModule extends Module { + + public static final String NAME = "queue"; + + @Override public String name() { + return NAME; + } + + @Override public Class[] services() { + return new Class[] {QueueCreatorService.class}; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/DaemonThreadFactory.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/DaemonThreadFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..08d79b53ec500d047cec987b0a830cf00ec7cf9f --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/DaemonThreadFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +import java.util.concurrent.ThreadFactory; + +/** + * @author peng-yongsheng + */ +public enum DaemonThreadFactory implements ThreadFactory { + INSTANCE; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setDaemon(true); + return t; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/EndOfBatchCommand.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/EndOfBatchCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..1b67f43ffcbeda1b9257d7f88a40f8c8dbe75619 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/EndOfBatchCommand.java @@ -0,0 +1,25 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +/** + * @author peng-yongsheng + */ +public class EndOfBatchCommand { +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/MessageHolder.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/MessageHolder.java new file mode 100644 index 0000000000000000000000000000000000000000..00d8b4797cb3793c3d374a358d623c7b013deb58 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/MessageHolder.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +/** + * @author peng-yongsheng + */ +public class MessageHolder { + private Object message; + + public Object getMessage() { + return message; + } + + public void setMessage(Object message) { + this.message = message; + } + + public void reset() { + message = null; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueCreator.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueCreator.java new file mode 100644 index 0000000000000000000000000000000000000000..7527e8aa5c3ca6f0298dbc8cede7263fc96f73b7 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueCreator.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +/** + * @author peng-yongsheng + */ +public interface QueueCreator { + QueueEventHandler create(int queueSize, QueueExecutor executor); +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueEventHandler.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueEventHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..7467dbc2ff4b0c41552308796d375eef7fdcb6fd --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueEventHandler.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +/** + * @author peng-yongsheng + */ +public interface QueueEventHandler { + void tell(Object message); +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java new file mode 100644 index 0000000000000000000000000000000000000000..eabf2c415a6fd1d28236fefc8bcd94bf547e1f14 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.base; + +/** + * @author peng-yongsheng + */ +public interface QueueExecutor { + void execute(Object message); +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/service/QueueCreatorService.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/service/QueueCreatorService.java new file mode 100644 index 0000000000000000000000000000000000000000..8860878d3204cd006d04516f2a2b08c44c71281c --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/service/QueueCreatorService.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.service; + +import org.skywalking.apm.collector.core.module.Service; +import org.skywalking.apm.collector.queue.base.QueueEventHandler; +import org.skywalking.apm.collector.queue.base.QueueExecutor; + +/** + * @author peng-yongsheng + */ +public interface QueueCreatorService extends Service { + QueueEventHandler create(int queueSize, QueueExecutor executor); +} diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.Module b/apm-collector/apm-collector-queue/collector-queue-define/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.Module new file mode 100644 index 0000000000000000000000000000000000000000..8e26eed9c85736ff5a5f057e2f579b4f8dddf471 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.Module @@ -0,0 +1,19 @@ +# +# Copyright 2017, OpenSkywalking Organization All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Project repository: https://github.com/OpenSkywalking/skywalking +# + +org.skywalking.apm.collector.queue.QueueModule \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/pom.xml b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/pom.xml index dd2224077c8a958f414f17ca0d80fcb12116f4a6..eac868d49bf633a5787bdeace7afc1e403286ec9 100644 --- a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/pom.xml +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/pom.xml @@ -30,4 +30,16 @@ collector-queue-disruptor-provider jar + + + org.skywalking + collector-queue-define + ${project.version} + + + com.lmax + disruptor + 3.3.6 + + \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/QueueModuleDisruptorProvider.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/QueueModuleDisruptorProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..a5c85ebc85bdbd71993b2da4948a5191e0f5a650 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/QueueModuleDisruptorProvider.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.disruptor; + +import java.util.Properties; +import org.skywalking.apm.collector.core.module.Module; +import org.skywalking.apm.collector.core.module.ModuleProvider; +import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; +import org.skywalking.apm.collector.queue.QueueModule; +import org.skywalking.apm.collector.queue.disruptor.service.DisruptorQueueCreatorService; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; + +/** + * @author peng-yongsheng + */ +public class QueueModuleDisruptorProvider extends ModuleProvider { + + @Override public String name() { + return "disruptor"; + } + + @Override public Class module() { + return QueueModule.class; + } + + @Override public void prepare(Properties config) throws ServiceNotProvidedException { + this.registerServiceImplementation(QueueCreatorService.class, new DisruptorQueueCreatorService()); + } + + @Override public void start(Properties config) throws ServiceNotProvidedException { + + } + + @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { + + } + + @Override public String[] requiredModules() { + return new String[0]; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..447c3bc2db1f240744e5bca8d506c942b2442612 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java @@ -0,0 +1,76 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.disruptor.base; + +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; +import org.skywalking.apm.collector.queue.base.EndOfBatchCommand; +import org.skywalking.apm.collector.queue.base.MessageHolder; +import org.skywalking.apm.collector.queue.base.QueueEventHandler; +import org.skywalking.apm.collector.queue.base.QueueExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class DisruptorEventHandler implements EventHandler, QueueEventHandler { + + private final Logger logger = LoggerFactory.getLogger(DisruptorEventHandler.class); + + private RingBuffer ringBuffer; + private QueueExecutor executor; + + DisruptorEventHandler(RingBuffer ringBuffer, QueueExecutor executor) { + this.ringBuffer = ringBuffer; + this.executor = executor; + } + + /** + * Receive the message from disruptor, when message in disruptor is empty, then send the cached data + * to the next workers. + * + * @param event published to the {@link RingBuffer} + * @param sequence of the event being processed + * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer} + */ + public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) { + Object message = event.getMessage(); + event.reset(); + + executor.execute(message); + if (endOfBatch) { + executor.execute(new EndOfBatchCommand()); + } + } + + /** + * Push the message into disruptor ring buffer. + * + * @param message of the data to process. + */ + public void tell(Object message) { + long sequence = ringBuffer.next(); + try { + ringBuffer.get(sequence).setMessage(message); + } finally { + ringBuffer.publish(sequence); + } + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorQueueCreator.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorQueueCreator.java new file mode 100644 index 0000000000000000000000000000000000000000..f2834709ed9c2923b4ffee284ac7e6c99e8bcdae --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorQueueCreator.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.disruptor.base; + +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import org.skywalking.apm.collector.queue.base.DaemonThreadFactory; +import org.skywalking.apm.collector.queue.base.MessageHolder; +import org.skywalking.apm.collector.queue.base.QueueCreator; +import org.skywalking.apm.collector.queue.base.QueueEventHandler; +import org.skywalking.apm.collector.queue.base.QueueExecutor; + +/** + * @author peng-yongsheng + */ +public class DisruptorQueueCreator implements QueueCreator { + + @Override public QueueEventHandler create(int queueSize, QueueExecutor executor) { + // Specify the size of the ring buffer, must be power of 2. + if (!((((queueSize - 1) & queueSize) == 0) && queueSize != 0)) { + throw new IllegalArgumentException("queue size must be power of 2"); + } + + // Construct the Disruptor + Disruptor disruptor = new Disruptor(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE); + + RingBuffer ringBuffer = disruptor.getRingBuffer(); + DisruptorEventHandler eventHandler = new DisruptorEventHandler(ringBuffer, executor); + + // Connect the handler + disruptor.handleEventsWith(eventHandler); + + // Start the Disruptor, starts all threads running + disruptor.start(); + return eventHandler; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/MessageHolderFactory.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/MessageHolderFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..9d5e1a54d7ccebd823183affa54456b6f72b80ac --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/MessageHolderFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.disruptor.base; + +import com.lmax.disruptor.EventFactory; +import org.skywalking.apm.collector.queue.base.MessageHolder; + +/** + * @author peng-yongsheng + */ +public class MessageHolderFactory implements EventFactory { + + public static MessageHolderFactory INSTANCE = new MessageHolderFactory(); + + public MessageHolder newInstance() { + return new MessageHolder(); + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/service/DisruptorQueueCreatorService.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/service/DisruptorQueueCreatorService.java new file mode 100644 index 0000000000000000000000000000000000000000..52021f5c50fc3c35f59a080b9210e8a882ad9f1d --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/service/DisruptorQueueCreatorService.java @@ -0,0 +1,32 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.queue.disruptor.service; + +import org.skywalking.apm.collector.queue.base.QueueEventHandler; +import org.skywalking.apm.collector.queue.base.QueueExecutor; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; + +/** + * @author peng-yongsheng + */ +public class DisruptorQueueCreatorService implements QueueCreatorService { + @Override public QueueEventHandler create(int queueSize, QueueExecutor executor) { + return null; + } +} diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider new file mode 100644 index 0000000000000000000000000000000000000000..080bba992cfd304e3e51eee12249183a145a2d93 --- /dev/null +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/resources/META-INF/services/org.skywalking.apm.collector.core.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Copyright 2017, OpenSkywalking Organization All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Project repository: https://github.com/OpenSkywalking/skywalking +# + +org.skywalking.apm.collector.queue.disruptor.QueueModuleDisruptorProvider \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/pom.xml b/apm-collector/apm-collector-queue/pom.xml index 5735adc520bb949cd7565db2b3412c54a4d71708..9f80371ef7dee4f077222b863a2f2ce57e5e4fc3 100644 --- a/apm-collector/apm-collector-queue/pom.xml +++ b/apm-collector/apm-collector-queue/pom.xml @@ -35,5 +35,11 @@ collector-queue-disruptor-provider - + + + org.skywalking + apm-collector-core + ${project.version} + + \ No newline at end of file