KafkaProvisioningService.java 2.2 KB
Newer Older
1 2 3 4
package org.maxkey.identity.kafka;

import java.util.UUID;

MaxKey单点登录官方's avatar
MaxKey单点登录官方 已提交
5
import org.maxkey.configuration.ApplicationConfig;
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
import org.maxkey.util.DateUtils;
import org.maxkey.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProvisioningService {
    
    private static final Logger _logger = LoggerFactory.getLogger(KafkaProvisioningService.class);
    
    @Autowired
    protected ApplicationConfig applicationConfig;
    
    @Autowired
    protected KafkaTemplate<String, String> kafkaTemplate;

    public void setApplicationConfig(ApplicationConfig applicationConfig) {
        this.applicationConfig = applicationConfig;
    }

    public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    public void send(String topic,Object content,String actionType) {
        if(applicationConfig.isKafkaSupport()) {
            KafkaMessage message = new KafkaMessage();
            message.setMsgId(UUID.randomUUID().toString());
            message.setActionType(actionType);
            message.setTopic(topic);
            message.setSendTime(DateUtils.getCurrentDateTimeAsString());
            message.setContent(JsonUtils.gson2Json(content));
            String msg = JsonUtils.gson2Json(message);
            _logger.info("send  message = {}", msg);
43 44 45
            //通过线程发送Kafka消息
            KafkaProvisioningThread thread = 
                    new  KafkaProvisioningThread(kafkaTemplate,topic,msg);
46
            
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
            thread.start();
        }
    }
    
    
    
    class KafkaProvisioningThread extends Thread{

        KafkaTemplate<String, String> kafkaTemplate;
        
        String topic ;
        
        String msg;
        
        public KafkaProvisioningThread(KafkaTemplate<String, String> kafkaTemplate, String topic, String msg) {
            this.kafkaTemplate = kafkaTemplate;
            this.topic = topic;
            this.msg = msg;
        }

        @Override
        public void run() {
69 70
            kafkaTemplate.send(topic, msg);
        }
71

72 73 74
    }
    
}