StreamModuleInstaller.java 3.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

P
pengys5 已提交
19 20
package org.skywalking.apm.collector.stream;

P
pengys5 已提交
21
import java.util.LinkedList;
P
pengys5 已提交
22 23
import java.util.List;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
P
pengys5 已提交
24
import org.skywalking.apm.collector.core.framework.Context;
P
pengys5 已提交
25
import org.skywalking.apm.collector.core.framework.DefineException;
P
pengys5 已提交
26
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
P
pengys5 已提交
27
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
P
pengys5 已提交
28 29 30 31 32 33 34 35 36 37 38 39
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.LocalAsyncWorkerProviderDefineLoader;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.RemoteWorkerProviderDefineLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author pengys5
 */
P
pengys5 已提交
40
public class StreamModuleInstaller extends SingleModuleInstaller {
P
pengys5 已提交
41 42 43

    private final Logger logger = LoggerFactory.getLogger(StreamModuleInstaller.class);

P
pengys5 已提交
44 45 46 47 48 49 50 51
    @Override public String groupName() {
        return StreamModuleGroupDefine.GROUP_NAME;
    }

    @Override public Context moduleContext() {
        return new StreamModuleContext(groupName());
    }

P
pengys5 已提交
52 53 54 55 56 57 58
    @Override public List<String> dependenceModules() {
        List<String> dependenceModules = new LinkedList<>();
        dependenceModules.add(QueueModuleGroupDefine.GROUP_NAME);
        return dependenceModules;
    }

    @Override public void onAfterInstall() throws DefineException {
P
pengys5 已提交
59
        initializeWorker((StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(groupName()));
P
pengys5 已提交
60 61
    }

P
pengys5 已提交
62
    private void initializeWorker(StreamModuleContext context) throws DefineException {
P
pengys5 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
        ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext();
        context.setClusterWorkerContext(clusterWorkerContext);

        LocalAsyncWorkerProviderDefineLoader localAsyncProviderLoader = new LocalAsyncWorkerProviderDefineLoader();
        RemoteWorkerProviderDefineLoader remoteProviderLoader = new RemoteWorkerProviderDefineLoader();
        try {
            List<AbstractLocalAsyncWorkerProvider> localAsyncProviders = localAsyncProviderLoader.load();
            for (AbstractLocalAsyncWorkerProvider provider : localAsyncProviders) {
                provider.setClusterContext(clusterWorkerContext);
                provider.create();
                clusterWorkerContext.putRole(provider.role());
            }

            List<AbstractRemoteWorkerProvider> remoteProviders = remoteProviderLoader.load();
            for (AbstractRemoteWorkerProvider provider : remoteProviders) {
                provider.setClusterContext(clusterWorkerContext);
                clusterWorkerContext.putRole(provider.role());
P
pengys5 已提交
80
                clusterWorkerContext.putProvider(provider);
P
pengys5 已提交
81
            }
P
pengys5 已提交
82
        } catch (ProviderNotFoundException e) {
P
pengys5 已提交
83 84 85 86
            logger.error(e.getMessage(), e);
        }
    }
}