syncRespMgr.c 6.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRespMgr.h"
17
#include "syncRaftEntry.h"
18
#include "syncRaftStore.h"
M
Minghao Li 已提交
19 20 21

SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
  SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr));
22 23 24 25
  if (pObj == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
M
Minghao Li 已提交
26 27 28 29
  memset(pObj, 0, sizeof(SSyncRespMgr));

  pObj->pRespHash =
      taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
M
Minghao Li 已提交
30
  ASSERT(pObj->pRespHash != NULL);
M
Minghao Li 已提交
31 32 33 34 35 36 37 38 39
  pObj->ttl = ttl;
  pObj->data = data;
  pObj->seqNum = 0;
  taosThreadMutexInit(&(pObj->mutex), NULL);

  return pObj;
}

void syncRespMgrDestroy(SSyncRespMgr *pObj) {
M
Minghao Li 已提交
40 41 42 43 44 45 46
  if (pObj != NULL) {
    taosThreadMutexLock(&(pObj->mutex));
    taosHashCleanup(pObj->pRespHash);
    taosThreadMutexUnlock(&(pObj->mutex));
    taosThreadMutexDestroy(&(pObj->mutex));
    taosMemoryFree(pObj);
  }
M
Minghao Li 已提交
47 48 49 50 51 52 53 54
}

int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
  taosThreadMutexLock(&(pObj->mutex));

  uint64_t keyCode = ++(pObj->seqNum);
  taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));

M
Minghao Li 已提交
55
  SSyncNode *pSyncNode = pObj->data;
M
Minghao Li 已提交
56
  char       eventLog[128];
S
Shengliang Guan 已提交
57 58
  snprintf(eventLog, sizeof(eventLog), "save message handle, type:%s seq:%" PRIu64 " handle:%p",
           TMSG_INFO(pStub->rpcMsg.msgType), keyCode, pStub->rpcMsg.info.handle);
M
Minghao Li 已提交
59
  syncNodeEventLog(pSyncNode, eventLog);
M
Minghao Li 已提交
60

M
Minghao Li 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
  taosThreadMutexUnlock(&(pObj->mutex));
  return keyCode;
}

int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index) {
  taosThreadMutexLock(&(pObj->mutex));

  taosHashRemove(pObj->pRespHash, &index, sizeof(index));

  taosThreadMutexUnlock(&(pObj->mutex));
  return 0;
}

int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
  taosThreadMutexLock(&(pObj->mutex));

  void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
  if (pTmp != NULL) {
    memcpy(pStub, pTmp, sizeof(SRespStub));
M
Minghao Li 已提交
80 81

    SSyncNode *pSyncNode = pObj->data;
M
Minghao Li 已提交
82
    char       eventLog[128];
S
Shengliang Guan 已提交
83 84
    snprintf(eventLog, sizeof(eventLog), "get message handle, type:%s seq:%" PRIu64 " handle:%p",
             TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle);
M
Minghao Li 已提交
85
    syncNodeEventLog(pSyncNode, eventLog);
M
Minghao Li 已提交
86

M
Minghao Li 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99
    taosThreadMutexUnlock(&(pObj->mutex));
    return 1;  // get one object
  }
  taosThreadMutexUnlock(&(pObj->mutex));
  return 0;  // get none object
}

int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
  taosThreadMutexLock(&(pObj->mutex));

  void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
  if (pTmp != NULL) {
    memcpy(pStub, pTmp, sizeof(SRespStub));
M
Minghao Li 已提交
100 101

    SSyncNode *pSyncNode = pObj->data;
M
Minghao Li 已提交
102
    char       eventLog[128];
S
Shengliang Guan 已提交
103 104
    snprintf(eventLog, sizeof(eventLog), "get-and-del message handle, type:%s seq:%" PRIu64 " handle:%p",
             TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle);
M
Minghao Li 已提交
105
    syncNodeEventLog(pSyncNode, eventLog);
M
Minghao Li 已提交
106

M
Minghao Li 已提交
107
    taosHashRemove(pObj->pRespHash, &index, sizeof(index));
M
Minghao Li 已提交
108
    taosThreadMutexUnlock(&(pObj->mutex));
M
Minghao Li 已提交
109 110 111 112 113 114
    return 1;  // get one object
  }
  taosThreadMutexUnlock(&(pObj->mutex));
  return 0;  // get none object
}

115 116 117 118 119 120
void syncRespCleanRsp(SSyncRespMgr *pObj) {
  taosThreadMutexLock(&(pObj->mutex));
  syncRespCleanByTTL(pObj, -1, true);
  taosThreadMutexUnlock(&(pObj->mutex));
}

M
Minghao Li 已提交
121 122
void syncRespClean(SSyncRespMgr *pObj) {
  taosThreadMutexLock(&(pObj->mutex));
123
  syncRespCleanByTTL(pObj, pObj->ttl, false);
M
Minghao Li 已提交
124 125 126
  taosThreadMutexUnlock(&(pObj->mutex));
}

127
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
128 129
  SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
  int        cnt = 0;
M
Minghao Li 已提交
130
  int        sum = 0;
131 132
  SSyncNode *pSyncNode = pObj->data;

M
Minghao Li 已提交
133
  SArray *delIndexArray = taosArrayInit(0, sizeof(uint64_t));
134
  ASSERT(delIndexArray != NULL);
M
Minghao Li 已提交
135
  sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId);
136 137

  while (pStub) {
M
Minghao Li 已提交
138
    size_t    len;
139
    void     *key = taosHashGetKey(pStub, &len);
M
Minghao Li 已提交
140
    uint64_t *pSeqNum = (uint64_t *)key;
M
Minghao Li 已提交
141
    sum++;
142 143

    int64_t nowMS = taosGetTimestampMs();
144
    if (nowMS - pStub->createTime > ttl || -1 == ttl) {
M
Minghao Li 已提交
145
      taosArrayPush(delIndexArray, pSeqNum);
146 147
      cnt++;

M
Minghao Li 已提交
148 149 150 151 152 153 154 155 156 157 158
      SFsmCbMeta cbMeta = {0};
      cbMeta.index = SYNC_INDEX_INVALID;
      cbMeta.lastConfigIndex = SYNC_INDEX_INVALID;
      cbMeta.isWeak = false;
      cbMeta.code = TSDB_CODE_SYN_TIMEOUT;
      cbMeta.state = pSyncNode->state;
      cbMeta.seqNum = *pSeqNum;
      cbMeta.term = SYNC_TERM_INVALID;
      cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
      cbMeta.flag = 0;

M
Minghao Li 已提交
159 160
      pStub->rpcMsg.pCont = NULL;
      pStub->rpcMsg.contLen = 0;
161 162 163 164 165 166 167 168

      // TODO: and make rpcMsg body, call commit cb
      // pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);

      pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
      if (pStub->rpcMsg.info.handle != NULL) {
        tmsgSendRsp(&(pStub->rpcMsg));
      }
169 170 171 172 173 174
    }

    pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
  }

  int32_t arraySize = taosArrayGetSize(delIndexArray);
M
Minghao Li 已提交
175
  sDebug("vgId:%d, resp mgr end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize);
176 177

  for (int32_t i = 0; i < arraySize; ++i) {
M
Minghao Li 已提交
178 179 180
    uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
    taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
    sDebug("vgId:%d, resp mgr clean by ttl, seq:%d", pSyncNode->vgId, *pSeqNum);
181 182 183
  }
  taosArrayDestroy(delIndexArray);
}