syncRespMgr.c 6.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncRespMgr.h"
18
#include "syncRaftEntry.h"
19
#include "syncRaftStore.h"
S
Shengliang Guan 已提交
20
#include "syncUtil.h"
M
Minghao Li 已提交
21 22

SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
S
Shengliang Guan 已提交
23
  SSyncRespMgr *pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr));
24 25 26 27
  if (pObj == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
M
Minghao Li 已提交
28 29 30

  pObj->pRespHash =
      taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
S
Shengliang Guan 已提交
31 32
  if (pObj->pRespHash == NULL) return NULL;

M
Minghao Li 已提交
33 34 35 36 37
  pObj->ttl = ttl;
  pObj->data = data;
  pObj->seqNum = 0;
  taosThreadMutexInit(&(pObj->mutex), NULL);

38
  SSyncNode *pNode = pObj->data;
S
Shengliang Guan 已提交
39
  sDebug("vgId:%d, resp manager create", pNode->vgId);
M
Minghao Li 已提交
40 41 42 43
  return pObj;
}

void syncRespMgrDestroy(SSyncRespMgr *pObj) {
S
Shengliang Guan 已提交
44
  if (pObj == NULL) return;
45

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53
  SSyncNode *pNode = pObj->data;
  sDebug("vgId:%d, resp manager destroy", pNode->vgId);

  taosThreadMutexLock(&pObj->mutex);
  taosHashCleanup(pObj->pRespHash);
  taosThreadMutexUnlock(&pObj->mutex);
  taosThreadMutexDestroy(&(pObj->mutex));
  taosMemoryFree(pObj);
M
Minghao Li 已提交
54 55
}

S
Shengliang Guan 已提交
56 57
uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub) {
  taosThreadMutexLock(&pObj->mutex);
M
Minghao Li 已提交
58

S
Shengliang Guan 已提交
59 60 61 62
  uint64_t seq = ++(pObj->seqNum);
  int32_t  code = taosHashPut(pObj->pRespHash, &seq, sizeof(uint64_t), pStub, sizeof(SRespStub));
  sNTrace(pObj->data, "save message handle:%p, type:%s seq:%" PRIu64 " code:0x%x", pStub->rpcMsg.info.handle,
          TMSG_INFO(pStub->rpcMsg.msgType), seq, code);
M
Minghao Li 已提交
63

S
Shengliang Guan 已提交
64 65
  taosThreadMutexUnlock(&pObj->mutex);
  return seq;
M
Minghao Li 已提交
66 67
}

S
Shengliang Guan 已提交
68 69
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t seq) {
  taosThreadMutexLock(&pObj->mutex);
M
Minghao Li 已提交
70

S
Shengliang Guan 已提交
71 72
  int32_t code = taosHashRemove(pObj->pRespHash, &seq, sizeof(seq));
  sNTrace(pObj->data, "remove message handle, seq:%" PRIu64 " code:%d", seq, code);
M
Minghao Li 已提交
73

S
Shengliang Guan 已提交
74 75
  taosThreadMutexUnlock(&pObj->mutex);
  return code;
M
Minghao Li 已提交
76 77
}

S
Shengliang Guan 已提交
78 79
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub) {
  taosThreadMutexLock(&pObj->mutex);
M
Minghao Li 已提交
80

S
Shengliang Guan 已提交
81
  SRespStub *pTmp = taosHashGet(pObj->pRespHash, &seq, sizeof(uint64_t));
M
Minghao Li 已提交
82 83
  if (pTmp != NULL) {
    memcpy(pStub, pTmp, sizeof(SRespStub));
S
Shengliang Guan 已提交
84 85
    sNTrace(pObj->data, "get message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType), seq,
            pStub->rpcMsg.info.handle);
M
Minghao Li 已提交
86

S
Shengliang Guan 已提交
87
    taosThreadMutexUnlock(&pObj->mutex);
M
Minghao Li 已提交
88
    return 1;  // get one object
89 90
  } else {
    sNError(pObj->data, "get message handle, no object of seq:%" PRIu64, seq);
M
Minghao Li 已提交
91
  }
S
Shengliang Guan 已提交
92 93

  taosThreadMutexUnlock(&pObj->mutex);
M
Minghao Li 已提交
94 95 96
  return 0;  // get none object
}

S
Shengliang Guan 已提交
97 98
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *pInfo) {
  taosThreadMutexLock(&pObj->mutex);
M
Minghao Li 已提交
99

S
Shengliang Guan 已提交
100 101 102 103 104 105
  SRespStub *pStub = taosHashGet(pObj->pRespHash, &seq, sizeof(uint64_t));
  if (pStub != NULL) {
    *pInfo = pStub->rpcMsg.info;
    sNTrace(pObj->data, "get-and-del message handle:%p, type:%s seq:%" PRIu64, pStub->rpcMsg.info.handle,
            TMSG_INFO(pStub->rpcMsg.msgType), seq);
    taosHashRemove(pObj->pRespHash, &seq, sizeof(uint64_t));
M
Minghao Li 已提交
106

S
Shengliang Guan 已提交
107
    taosThreadMutexUnlock(&pObj->mutex);
M
Minghao Li 已提交
108
    return 1;  // get one object
109 110
  } else {
    sNError(pObj->data, "get-and-del message handle, no object of seq:%" PRIu64, seq);
M
Minghao Li 已提交
111 112
  }

S
Shengliang Guan 已提交
113 114
  taosThreadMutexUnlock(&pObj->mutex);
  return 0;  // get none object
M
Minghao Li 已提交
115 116
}

S
Shengliang Guan 已提交
117
static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
118 119
  SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
  int        cnt = 0;
M
Minghao Li 已提交
120
  int        sum = 0;
121 122
  SSyncNode *pSyncNode = pObj->data;

S
Shengliang Guan 已提交
123 124
  SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
  if (delIndexArray == NULL) return;
125

126
  sDebug("vgId:%d, resp manager begin clean by ttl", pSyncNode->vgId);
127
  while (pStub) {
M
Minghao Li 已提交
128
    size_t    len;
S
Shengliang Guan 已提交
129
    void     *key = taosHashGetKey(pStub, &len);
M
Minghao Li 已提交
130
    uint64_t *pSeqNum = (uint64_t *)key;
M
Minghao Li 已提交
131
    sum++;
132 133

    int64_t nowMS = taosGetTimestampMs();
134
    if (nowMS - pStub->createTime > ttl || -1 == ttl) {
M
Minghao Li 已提交
135
      taosArrayPush(delIndexArray, pSeqNum);
136 137
      cnt++;

S
Shengliang Guan 已提交
138
      SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
139 140 141 142 143 144 145 146 147
          .index = SYNC_INDEX_INVALID,
          .lastConfigIndex = SYNC_INDEX_INVALID,
          .isWeak = false,
          .code = TSDB_CODE_SYN_TIMEOUT,
          .state = pSyncNode->state,
          .seqNum = *pSeqNum,
          .term = SYNC_TERM_INVALID,
          .currentTerm = pSyncNode->pRaftStore->currentTerm,
          .flag = 0,
S
Shengliang Guan 已提交
148
      };
M
Minghao Li 已提交
149

M
Minghao Li 已提交
150 151
      pStub->rpcMsg.pCont = NULL;
      pStub->rpcMsg.contLen = 0;
152 153

      // TODO: and make rpcMsg body, call commit cb
S
Shengliang Guan 已提交
154
      // pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta);
155 156 157 158
      SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT};
      sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pSyncNode->vgId, rpcMsg.info.handle,
            TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle);
      rpcSendResponse(&rpcMsg);
159 160
    }

S
Shengliang Guan 已提交
161
    pStub = taosHashIterate(pObj->pRespHash, pStub);
162 163 164
  }

  int32_t arraySize = taosArrayGetSize(delIndexArray);
165
  sDebug("vgId:%d, resp manager end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize);
166 167

  for (int32_t i = 0; i < arraySize; ++i) {
M
Minghao Li 已提交
168 169
    uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
    taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
170
    sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pSyncNode->vgId, *pSeqNum);
171 172 173
  }
  taosArrayDestroy(delIndexArray);
}
S
Shengliang Guan 已提交
174 175

void syncRespCleanRsp(SSyncRespMgr *pObj) {
176
  SSyncNode *pNode = pObj->data;
S
Shengliang Guan 已提交
177
  sTrace("vgId:%d, clean all resp", pNode->vgId);
178

S
Shengliang Guan 已提交
179 180 181 182 183 184
  taosThreadMutexLock(&pObj->mutex);
  syncRespCleanByTTL(pObj, -1, true);
  taosThreadMutexUnlock(&pObj->mutex);
}

void syncRespClean(SSyncRespMgr *pObj) {
185
  SSyncNode *pNode = pObj->data;
S
Shengliang Guan 已提交
186
  sTrace("vgId:%d, clean resp by ttl", pNode->vgId);
187

S
Shengliang Guan 已提交
188 189 190 191
  taosThreadMutexLock(&pObj->mutex);
  syncRespCleanByTTL(pObj, pObj->ttl, false);
  taosThreadMutexUnlock(&pObj->mutex);
}