GatewayHeartbeatController.java 2.7 KB
Newer Older
Z
zengqiao 已提交
1 2
package com.xiaojukeji.kafka.manager.web.api.versionone.gateway;

Z
zengqiao 已提交
3
import com.alibaba.fastjson.JSONObject;
Z
zengqiao 已提交
4 5
import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel;
import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent;
Z
zengqiao 已提交
6
import com.xiaojukeji.kafka.manager.common.entity.Result;
Z
zengqiao 已提交
7
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO;
Z
zengqiao 已提交
8
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
Z
zengqiao 已提交
9 10 11 12 13 14 15 16 17 18
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

Z
zengqiao 已提交
19 20
import java.util.List;

Z
zengqiao 已提交
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
/**
 * @author zengqiao
 * @date 20/7/6
 */
@Api(tags = "GATEWAY-WEB相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.GATEWAY_API_V1_PREFIX)
public class GatewayHeartbeatController {
    private final static Logger LOGGER = LoggerFactory.getLogger(GatewayHeartbeatController.class);

    @Autowired
    private TopicConnectionService topicConnectionService;

    @ApiLevel(level = ApiLevelContent.LEVEL_NORMAL_3)
    @ApiOperation(value = "连接信息上报入口", notes = "Broker主动上报信息")
    @RequestMapping(value = "heartbeat/survive-user", method = RequestMethod.POST)
    @ResponseBody
Z
zengqiao 已提交
38 39 40
    public Result receiveTopicConnections(@RequestParam("clusterId") Long clusterId,
                                          @RequestParam("brokerId") Integer brokerId,
                                          @RequestBody JSONObject jsonObject) {
Z
zengqiao 已提交
41 42
        if (ValidateUtils.isNull(jsonObject) || jsonObject.isEmpty()) {
            LOGGER.info("class=GatewayHeartbeatController||method=receiveTopicConnections||clusterId={}||brokerId={}||msg=connections empty!", clusterId, brokerId);
Z
zengqiao 已提交
43
            return Result.buildSuc();
Z
zengqiao 已提交
44 45 46 47 48 49
        }

        LOGGER.info("class=GatewayHeartbeatController||method=receiveTopicConnections||clusterId={}||brokerId={}||size={}||msg=receive connections", clusterId, brokerId, jsonObject.size());

        List<TopicConnectionDO> doList = null;
        try {
Z
zengqiao 已提交
50
            doList = JsonUtils.parseTopicConnections(clusterId, jsonObject, System.currentTimeMillis());
Z
zengqiao 已提交
51
        } catch (Exception e) {
Z
zengqiao 已提交
52 53
            LOGGER.error("class=GatewayHeartbeatController||method=receiveTopicConnections||clusterId={}||brokerId={}||msg=parse data failed||exception={}", clusterId, brokerId, e.getMessage());
            return Result.buildFailure("fail");
Z
zengqiao 已提交
54
        }
Z
zengqiao 已提交
55 56 57

        topicConnectionService.batchAdd(doList);
        return Result.buildSuc();
Z
zengqiao 已提交
58 59
    }
}