RyTask.java 7.2 KB
Newer Older
1 2
package com.linkwechat.quartz.task;

3 4
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONObject;
1
18356073052 已提交
5
import com.fasterxml.jackson.core.JsonProcessingException;
6 7
import com.linkwechat.common.constant.WeConstans;
import com.linkwechat.common.core.elasticsearch.ElasticSearch;
S
sunxiwang 已提交
8
import com.linkwechat.common.core.redis.RedisCache;
9
import com.linkwechat.common.utils.StringUtils;
10
import com.linkwechat.wecom.domain.WeCustomer;
1
18356073052 已提交
11 12 13
import com.linkwechat.wecom.domain.WeCustomerMessageTimeTask;
import com.linkwechat.wecom.mapper.WeCustomerMessageTimeTaskMapper;
import com.linkwechat.wecom.service.*;
14 15
import com.tencent.wework.FinanceUtils;
import lombok.extern.slf4j.Slf4j;
16
import org.apache.commons.collections4.CollectionUtils;
17 18 19 20 21
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
S
fix  
sunxiwang 已提交
22
import org.springframework.beans.factory.annotation.Value;
23 24 25 26
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;
S
sunxiwang 已提交
27
import java.util.Optional;
1
18356073052 已提交
28
import java.util.concurrent.Semaphore;
29
import java.util.concurrent.atomic.AtomicLong;
30 31 32

/**
 * 定时任务调度测试
33
 *
34 35
 * @author ruoyi
 */
36
@Slf4j
37
@Component("ryTask")
38 39 40
public class RyTask {
    @Autowired
    private ElasticSearch elasticSearch;
S
sunxiwang 已提交
41 42 43 44
    @Autowired
    private RedisCache redisCache;
    @Autowired
    private IWeChatContactMappingService weChatContactMappingService;
L
leejoker 已提交
45 46
    @Autowired
    private IWeSensitiveService weSensitiveService;
47 48
    @Autowired
    private IWeSensitiveActHitService weSensitiveActHitService;
49 50
    @Autowired
    private IWeCustomerService weCustomerService;
1
18356073052 已提交
51 52 53 54 55
    @Autowired
    private WeCustomerMessageTimeTaskMapper customerMessageTimeTaskMapper;

    @Autowired
    private IWeCustomerMessageService weCustomerMessageService;
56

S
fix  
sunxiwang 已提交
57 58 59
    @Value("${wecome.chatKey}")
    private String chartKey;

1
18356073052 已提交
60

61
    public void ryMultipleParams(String s, Boolean b, Long l, Double d, Integer i) {
62 63 64
        System.out.println(StringUtils.format("执行多参方法: 字符串类型{},布尔类型{},长整型{},浮点型{},整形{}", s, b, l, d, i));
    }

65
    public void ryParams(String params) {
66 67 68
        System.out.println("执行有参方法:" + params);
    }

69
    public void ryNoParams() {
70 71
        System.out.println("执行无参方法");
    }
72 73 74


    public void FinanceTask(String corpId, String secret) throws IOException {
S
sunxiwang 已提交
75 76
        log.info("执行有参方法: params:{},{}", corpId, secret);
        //创建索引
S
fix  
sunxiwang 已提交
77
        elasticSearch.createIndex2(chartKey, elasticSearch.getFinanceMapping());
S
sunxiwang 已提交
78
        //从缓存中获取消息标识
79 80 81 82

        Object seqObject = Optional.ofNullable(redisCache.getCacheObject(WeConstans.CONTACT_SEQ_KEY)).orElse(0L);
        Long seqLong = Long.valueOf(String.valueOf(seqObject));
        AtomicLong index = new AtomicLong(seqLong);
83
        if (index.get() == 0) {
84
            setRedisCacheSeqValue(index);
S
sunxiwang 已提交
85 86
        }

87
        log.info(">>>>>>>seq:{}", index.get());
88
        FinanceUtils.initSDK(corpId, secret);
S
sunxiwang 已提交
89
        List<JSONObject> chatDataList = FinanceUtils.getChatData(index.get(),
S
sunxiwang 已提交
90
                "",
S
sunxiwang 已提交
91
                "", redisCache);
92
        if (CollectionUtil.isNotEmpty(chatDataList)) {
93
            try {
S
sunxiwang 已提交
94
                List<JSONObject> elasticSearchEntities = weChatContactMappingService.saveWeChatContactMapping(chatDataList);
95 96
                //获取敏感行为命中信息
                weSensitiveActHitService.hitWeSensitiveAct(chatDataList);
S
fix  
sunxiwang 已提交
97
                elasticSearch.insertBatchAsync(chartKey, elasticSearchEntities, weSensitiveService::hitSensitive);
98 99 100 101 102 103
            } catch (Exception e) {
                log.error("消息处理异常:ex:{}", e);
                e.printStackTrace();
            }
        }
    }
104

1
18356073052 已提交
105
    public void WeCustomers() {
106 107
        //查询系统所有客户
        List<WeCustomer> cacheList = redisCache.getCacheList(WeConstans.WECUSTOMERS_KEY);
1
18356073052 已提交
108
        if (CollectionUtils.isEmpty(cacheList)) {
109
            List<WeCustomer> customers = weCustomerService.selectWeCustomerList(null);
1
18356073052 已提交
110 111
            redisCache.setCacheList(WeConstans.WECUSTOMERS_KEY, customers);
        } else {
112 113
            List<WeCustomer> customers = weCustomerService.selectWeCustomerList(null);
            List<WeCustomer> weCustomers = redisCache.getCacheList(WeConstans.WECUSTOMERS_KEY);
1
18356073052 已提交
114 115
            if (CollectionUtils.isNotEmpty(weCustomers) && weCustomers.size() < customers.size()) {
                redisCache.setCacheList(WeConstans.WECUSTOMERS_KEY, customers);
116 117 118 119
            }
        }
    }

120
    private void setRedisCacheSeqValue(AtomicLong index) {
121
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
122
        SortBuilder<?> sortBuilderPrice = SortBuilders.fieldSort(WeConstans.CONTACT_SEQ_KEY).order(SortOrder.DESC);
123 124
        searchSourceBuilder.sort(sortBuilderPrice);
        searchSourceBuilder.size(1);
S
fix  
sunxiwang 已提交
125
        List<JSONObject> searchResultList = elasticSearch.search(chartKey, searchSourceBuilder, JSONObject.class);
126
        searchResultList.stream().findFirst().ifPresent(result -> {
L
leejoker 已提交
127
            index.set(result.getLong(WeConstans.CONTACT_SEQ_KEY) + 1);
128
        });
129
        redisCache.setCacheObject(WeConstans.CONTACT_SEQ_KEY, index);
130
    }
孙喜旺 已提交
131 132 133 134 135

    /**
     * @param corpId 企业id
     * @param secret 会话密钥
     */
136
    public void getPermitUserList(String corpId, String secret) {
孙喜旺 已提交
137 138 139
        log.info("执行有参方法: params:{},{}", corpId, secret);

    }
1
18356073052 已提交
140

1
18356073052 已提交
141 142 143 144
    /**
     * 扫描群发消息定时任务
     */
    public void messageTask() {
1
18356073052 已提交
145 146 147 148
        //获的当前时间的毫秒数
        long currentTime = System.currentTimeMillis();
        //customerMessageTimeTaskMapper
        List<WeCustomerMessageTimeTask> weCustomerMessageTimeTasks = customerMessageTimeTaskMapper.selectWeCustomerMessageTimeTaskGteSettingTime(currentTime);
1
18356073052 已提交
149 150 151 152 153 154 155 156 157 158

        final Semaphore semaphore = new Semaphore(5);

        if (CollectionUtils.isNotEmpty(weCustomerMessageTimeTasks)) {

            weCustomerMessageTimeTasks.forEach(
                    s -> {
                        try {
                            semaphore.acquire();
                            if (s.getMessageInfo() != null && s.getMessageId() != null || (s.getMessageInfo().getPushType().equals(WeConstans.SEND_MESSAGE_CUSTOMER)
L
fix bug  
leejoker 已提交
159
                                    && CollectionUtils.isNotEmpty(s.getCustomersInfo())) || (s.getMessageInfo().getPushType().equals(WeConstans.SEND_MESSAGE_GROUP)
1
18356073052 已提交
160 161 162 163 164 165 166 167 168 169 170 171
                                    && CollectionUtils.isNotEmpty(s.getGroupsInfo()))) {
                                weCustomerMessageService.sendMessgae(s.getMessageInfo(), s.getMessageId(), s.getCustomersInfo(), s.getGroupsInfo());
                                //更新消息处理状态
                                customerMessageTimeTaskMapper.updateTaskSolvedById(s.getTaskId());
                            }
                            semaphore.release();
                        } catch (JsonProcessingException | InterruptedException e) {
                            log.error("定时群发消息处理异常:ex:{}", e);
                            e.printStackTrace();
                        }
                    }
            );
1
18356073052 已提交
172 173 174 175
        }

    }

176
}