SyncKafkaAclTask.java 1.3 KB
Newer Older
1
package com.xiaojukeji.know.streaming.km.task.kafka.metadata;
Z
zengqiao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14

import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
import org.apache.kafka.common.acl.AclBinding;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

@Task(name = "SyncKafkaAclTask",
15
        description = "KafkaAcl信息同步到DB",
Z
zengqiao 已提交
16 17 18 19
        cron = "0 0/1 * * * ? *",
        autoRegister = true,
        consensual = ConsensualEnum.BROADCAST,
        timeout = 2 * 60)
20
public class SyncKafkaAclTask extends AbstractAsyncMetadataDispatchTask {
Z
zengqiao 已提交
21 22 23 24
    @Autowired
    private KafkaAclService kafkaAclService;

    @Override
25
    public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
26
        Result<List<AclBinding>> aclBindingListResult = kafkaAclService.getDataFromKafka(clusterPhy);
Z
zengqiao 已提交
27 28 29 30
        if (aclBindingListResult.failed()) {
            return TaskResult.FAIL;
        }

31
        kafkaAclService.writeToDB(clusterPhy.getId(), aclBindingListResult.getData());
Z
zengqiao 已提交
32 33 34 35

        return TaskResult.SUCCESS;
    }
}