syncRespMgr.c 4.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRespMgr.h"
17
#include "syncRaftStore.h"
M
Minghao Li 已提交
18 19 20 21 22 23 24

SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
  SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr));
  memset(pObj, 0, sizeof(SSyncRespMgr));

  pObj->pRespHash =
      taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
M
Minghao Li 已提交
25
  ASSERT(pObj->pRespHash != NULL);
M
Minghao Li 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  pObj->ttl = ttl;
  pObj->data = data;
  pObj->seqNum = 0;
  taosThreadMutexInit(&(pObj->mutex), NULL);

  return pObj;
}

void syncRespMgrDestroy(SSyncRespMgr *pObj) {
  taosThreadMutexLock(&(pObj->mutex));
  taosHashCleanup(pObj->pRespHash);
  taosThreadMutexUnlock(&(pObj->mutex));
  taosThreadMutexDestroy(&(pObj->mutex));
  taosMemoryFree(pObj);
}

int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
  taosThreadMutexLock(&(pObj->mutex));

  uint64_t keyCode = ++(pObj->seqNum);
  taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));

M
Minghao Li 已提交
48
  SSyncNode *pSyncNode = pObj->data;
M
Minghao Li 已提交
49 50 51 52 53
  char       eventLog[128];
  snprintf(eventLog, sizeof(eventLog), "resp mgr add, type:%s,%d, seq:%lu, handle:%p, ahandle:%p",
           TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle,
           pStub->rpcMsg.info.ahandle);
  syncNodeEventLog(pSyncNode, eventLog);
M
Minghao Li 已提交
54

M
Minghao Li 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
  taosThreadMutexUnlock(&(pObj->mutex));
  return keyCode;
}

int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index) {
  taosThreadMutexLock(&(pObj->mutex));

  taosHashRemove(pObj->pRespHash, &index, sizeof(index));

  taosThreadMutexUnlock(&(pObj->mutex));
  return 0;
}

int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
  taosThreadMutexLock(&(pObj->mutex));

  void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
  if (pTmp != NULL) {
    memcpy(pStub, pTmp, sizeof(SRespStub));
M
Minghao Li 已提交
74 75

    SSyncNode *pSyncNode = pObj->data;
M
Minghao Li 已提交
76 77 78 79 80
    char       eventLog[128];
    snprintf(eventLog, sizeof(eventLog), "resp mgr get, type:%s,%d, seq:%lu, handle:%p, ahandle:%p",
             TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
             pStub->rpcMsg.info.ahandle);
    syncNodeEventLog(pSyncNode, eventLog);
M
Minghao Li 已提交
81

M
Minghao Li 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94
    taosThreadMutexUnlock(&(pObj->mutex));
    return 1;  // get one object
  }
  taosThreadMutexUnlock(&(pObj->mutex));
  return 0;  // get none object
}

int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
  taosThreadMutexLock(&(pObj->mutex));

  void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
  if (pTmp != NULL) {
    memcpy(pStub, pTmp, sizeof(SRespStub));
M
Minghao Li 已提交
95 96

    SSyncNode *pSyncNode = pObj->data;
M
Minghao Li 已提交
97 98 99 100 101
    char       eventLog[128];
    snprintf(eventLog, sizeof(eventLog), "resp mgr get-and-del, type:%s,%d, seq:%lu, handle:%p, ahandle:%p",
             TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
             pStub->rpcMsg.info.ahandle);
    syncNodeEventLog(pSyncNode, eventLog);
M
Minghao Li 已提交
102

M
Minghao Li 已提交
103
    taosHashRemove(pObj->pRespHash, &index, sizeof(index));
M
Minghao Li 已提交
104
    taosThreadMutexUnlock(&(pObj->mutex));
M
Minghao Li 已提交
105 106 107 108 109 110 111 112 113 114 115 116
    return 1;  // get one object
  }
  taosThreadMutexUnlock(&(pObj->mutex));
  return 0;  // get none object
}

void syncRespClean(SSyncRespMgr *pObj) {
  taosThreadMutexLock(&(pObj->mutex));
  syncRespCleanByTTL(pObj, pObj->ttl);
  taosThreadMutexUnlock(&(pObj->mutex));
}

M
Minghao Li 已提交
117
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {}