未验证 提交 35af88cb 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #16195 from taosdata/feature/3.0_interval_hash_optimize

enh: rsma batch process
......@@ -2663,6 +2663,10 @@ typedef struct {
SEpSet epSet;
} SVgEpSet;
typedef struct {
int32_t padding;
} SRSmaExecMsg;
typedef struct {
int64_t suid;
int8_t level;
......
......@@ -202,6 +202,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
......
......@@ -615,6 +615,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155)
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
......
......@@ -338,6 +338,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -63,6 +63,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray* list);
void *vnodeGetIdx(SVnode *pVnode);
void *vnodeGetIvtIdx(SVnode *pVnode);
......
......@@ -57,9 +57,10 @@ typedef struct {
void *tmrHandle; // shared by all fetch tasks
} SSmaMgmt;
#define SMA_ENV_LOCK(env) (&(env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_LOCK(env) (&(env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_RSMA_STAT(sma) ((SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)(sma)->pRSmaEnv))
struct STSmaStat {
int8_t state; // ETsdbSmaStat
......@@ -86,15 +87,17 @@ struct SQTaskFWriter {
};
struct SRSmaStat {
SSma *pSma;
int64_t commitAppliedVer; // vnode applied version for async commit
int64_t refId; // shared by fetch tasks
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash
SSma *pSma;
int64_t commitAppliedVer; // vnode applied version for async commit
int64_t refId; // shared by fetch tasks
volatile int64_t qBufSize; // queue buffer size
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing
int8_t execStat; // 0 not in exec , 1 in exec
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
SHashObj *infoHash; // key: suid, value: SRSmaInfo
SHashObj *fetchHash; // key: suid, value: L1 or L2 or L1|L2
};
struct SSmaStat {
......@@ -105,34 +108,40 @@ struct SSmaStat {
T_REF_DECLARE()
};
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
#define RSMA_IMU_INFO_HASH(r) ((r)->iRsmaInfoHash)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
#define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_FS_LOCK(r) (&(r)->lock)
#define SMA_STAT_TSMA(s) (&(s)->tsmaStat)
#define SMA_STAT_RSMA(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->infoHash)
#define RSMA_FETCH_HASH(r) ((r)->fetchHash)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
#define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_FS_LOCK(r) (&(r)->lock)
struct SRSmaInfoItem {
int8_t level;
int8_t triggerStat;
int32_t maxDelay;
tmr_h tmrId;
int8_t level;
int8_t triggerStat;
uint16_t interval; // second
int32_t maxDelay;
tmr_h tmrId;
};
struct SRSmaInfo {
STSchema *pTSchema;
int64_t suid;
int64_t refId; // refId of SRSmaStat
int8_t delFlag;
uint64_t delFlag : 1;
uint64_t lastReceived : 63; // second
T_REF_DECLARE()
SRSmaInfoItem items[TSDB_RETENTION_L2];
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable
STaosQueue *queue; // buffer queue of SubmitReq
STaosQall *qall; // buffer qall of SubmitReq
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable qTaskInfo_t
STaosQueue *iQueue; // immutable buffer queue of SubmitReq
STaosQall *iQall; // immutable buffer qall of SubmitReq
};
#define RSMA_INFO_HEAD_LEN 32
#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
......@@ -161,6 +170,12 @@ enum {
RSMA_RESTORE_SYNC = 2,
};
typedef enum {
RSMA_EXEC_OVERFLOW = 1, // triggered by queue buf overflow
RSMA_EXEC_TIMEOUT = 2, // triggered by timer
RSMA_EXEC_COMMIT = 3, // triggered by commit
} ERsmaExecType;
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
......@@ -228,12 +243,13 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc);
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
......
......@@ -65,6 +65,7 @@ struct SVBufPool {
SVBufPool* next;
SVnode* pVnode;
volatile int32_t nRef;
TdThreadSpinlock lock;
int64_t size;
uint8_t* ptr;
SVBufPoolNode* pTail;
......
......@@ -199,6 +199,7 @@ int32_t smaAsyncCommit(SSma* pSma);
int32_t smaAsyncPostCommit(SSma* pSma);
int32_t smaDoRetention(SSma* pSma, int64_t now);
int32_t smaProcessFetch(SSma* pSma, void* pMsg);
int32_t smaProcessExec(SSma* pSma, void* pMsg);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
......
......@@ -83,8 +83,7 @@ int32_t smaBegin(SSma *pSma) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
int8_t rsmaTriggerStat =
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
......@@ -123,7 +122,7 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
// step 1: set rsma stat paused
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
......@@ -289,8 +288,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS;
}
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(SMA_ENV_STAT(pSmaEnv));
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
// cleanup outdated qtaskinfo files
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
......@@ -299,10 +297,9 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
}
/**
* @brief Rsma async commit implementation
* @brief Rsma async commit implementation(only do some necessary light weighted task)
* 1) set rsma stat TASK_TRIGGER_STAT_PAUSED
* 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write)
* 3)
*
* @param pSma
* @return int32_t
......@@ -314,7 +311,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
}
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
// step 1: set rsma stat
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
......@@ -336,28 +333,55 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
}
}
// step 3: swap rsmaInfoHash and iRsmaInfoHash
/**
* @brief step 3: consume the SubmitReq in buffer
* 1) This is high cost task and should not put in asyncPreCommit originally.
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
*/
nLoops = 0;
smaInfo("vgId:%d, start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1);
if (old == 0) break;
if (++nLoops > 1000) {
sched_yield();
nLoops = 0;
smaDebug("vgId:%d, wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
}
}
smaInfo("vgId:%d, end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) {
atomic_store_8(&pRSmaStat->execStat, 0);
return TSDB_CODE_FAILED;
}
// step 4: swap queue/qall and iQueue/iQall
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(RSMA_INFO_HASH(pRSmaStat));
ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat);
RSMA_INFO_HASH(pRSmaStat) =
taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
if (!RSMA_INFO_HASH(pRSmaStat)) {
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED;
while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
TSWAP(pInfo->iQall, pInfo->qall);
TSWAP(pInfo->iQueue, pInfo->queue);
TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]);
TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]);
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
}
atomic_store_64(&pRSmaStat->qBufSize, 0);
atomic_store_8(&pRSmaStat->execStat, 0);
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 4: others
// step 5: others
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
return TSDB_CODE_SUCCESS;
......@@ -375,17 +399,18 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
// perform persist task for qTaskInfo
tdRSmaPersistExecImpl(pRSmaStat, RSMA_IMU_INFO_HASH(pRSmaStat));
// perform persist task for qTaskInfo operator
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsmaInfoHash not empty.
* @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsma infoHash not empty.
*
* @param pSma
* @return int32_t
......@@ -396,65 +421,66 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SArray *rsmaDeleted = NULL;
// step 1: merge rsmaInfoHash and iRsmaInfoHash
// step 1: merge qTaskInfo and iQTaskInfo
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
#if 0
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
// just switch the hash pointer if rsmaInfoHash is empty
if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) {
SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat);
RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat);
RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash;
}
} else {
#endif
#if 1
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL);
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
if (refVal == 0) {
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
smaDebug(
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
"table:%" PRIi64,
SMA_VID(pSma), *pSuid);
} else {
smaDebug(
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
"table:%" PRIi64,
SMA_VID(pSma), refVal, *pSuid);
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
if (refVal == 0) {
if (!rsmaDeleted) {
if ((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))) {
taosArrayPush(rsmaDeleted, pSuid);
}
}
} else {
smaDebug(
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
"table:%" PRIi64,
SMA_VID(pSma), refVal, *pSuid);
}
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
continue;
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
continue;
}
if (pRSmaInfo->taskInfo[0]) {
if (pRSmaInfo->iTaskInfo[0]) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
pRSmaInfo->iTaskInfo[0] = NULL;
}
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
} else {
// free the resources
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]);
}
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
}
#endif
// }
taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = NULL;
for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) {
tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i);
void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
smaDebug(
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
"table:%" PRIi64,
SMA_VID(pSma), *pSuid);
}
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
}
taosArrayDestroy(rsmaDeleted);
// TODO: remove suid in files?
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
......
......@@ -171,7 +171,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
if (!pRSmaInfo) return 0;
int ref = T_REF_INC(pRSmaInfo);
smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
return 0;
......@@ -228,7 +228,12 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_INFO_HASH(pRSmaStat)) {
taosMemoryFreeClear(*pSmaStat);
return TSDB_CODE_FAILED;
}
RSMA_FETCH_HASH(pRSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_FETCH_HASH(pRSmaStat)) {
return TSDB_CODE_FAILED;
}
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
......@@ -264,8 +269,6 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
// step 2: destroy the rsma info and associated fetch tasks
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
#if 1
if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
while (infoHash) {
......@@ -274,10 +277,12 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
}
}
#endif
taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 3: wait all triggered fetch tasks finished
// step 3: destroy the rsma fetch hash
taosHashCleanup(RSMA_FETCH_HASH(pStat));
// step 4: wait all triggered fetch tasks finished
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
......@@ -293,7 +298,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
}
}
// step 4: free pStat
// step 5: free pStat
taosMemoryFreeClear(pStat);
}
}
......@@ -318,9 +323,9 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
if (pSmaStat) {
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat));
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat);
SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat;
int32_t vid = SMA_VID(pRSmaStat->pSma);
int64_t refId = RSMA_REF_ID(pRSmaStat);
if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
......
......@@ -139,7 +139,6 @@ static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppBuf)
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, size:%" PRIi64, SMA_VID(pSma), size);
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf);
pHdr->type = SNAP_DATA_QTASK;
pHdr->size = size;
......@@ -279,7 +278,8 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit
TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (!qTaskF) {
code = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName, tstrerror(code));
smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName,
tstrerror(code));
goto _err;
}
qWriter->pWriteH = qTaskF;
......@@ -309,7 +309,7 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
if (rollback) {
// TODO: rsma1/rsma2
// qtaskinfo
if(pWriter->pQTaskFWriter) {
if (pWriter->pQTaskFWriter) {
taosRemoveFile(pWriter->pQTaskFWriter->fname);
}
} else {
......
......@@ -175,7 +175,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
}
tdRefSmaStat(pSma, pStat);
pTsmaStat = SMA_TSMA_STAT(pStat);
pTsmaStat = SMA_STAT_TSMA(pStat);
if (!pTsmaStat->pTSma) {
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
......
......@@ -350,49 +350,45 @@ _err:
}
/**
* @brief pTSchema is shared
* @brief Clone qTaskInfo of SRSmaInfo
*
* @param pSma
* @param pDest
* @param pSrc
* @param pInfo
* @return int32_t
*/
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) {
SVnode *pVnode = pSma->pVnode;
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
SRSmaParam *param = NULL;
if (!pSrc) {
*pDest = NULL;
if (!pInfo) {
return TSDB_CODE_SUCCESS;
}
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid);
if (metaGetTableEntryByUid(&mr, pSrc->suid) < 0) {
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid,
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) {
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
terrstr());
goto _err;
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == pSrc->suid);
ASSERT(mr.me.uid == pInfo->suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i) < 0) {
if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
goto _err;
}
}
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid);
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
} else {
terrno = TSDB_CODE_RSMA_INVALID_SCHEMA;
goto _err;
}
metaReaderClear(&mr);
*pDest = pSrc; // pointer copy
return TSDB_CODE_SUCCESS;
_err:
*pDest = NULL;
metaReaderClear(&mr);
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, terrstr());
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr());
return TSDB_CODE_FAILED;
}
\ No newline at end of file
......@@ -2585,32 +2585,6 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
/**
* @brief Get all suids since suid
*
* @param pMeta
* @param suid return all suids in one vnode if suid is 0
* @param list
* @return int32_t
*/
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
if (!pCur) {
return TSDB_CODE_FAILED;
}
while (1) {
tb_uid_t id = metaStbCursorNext(pCur);
if (id == 0) {
break;
}
taosArrayPush(list, &id);
}
metaCloseStbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
......
......@@ -78,7 +78,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) {
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
SVBufPoolNode *pNode;
void *p;
taosThreadSpinLock(&pPool->lock);
if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
// allocate from the anchor node
p = pPool->ptr;
......@@ -89,6 +89,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
pNode = taosMemoryMalloc(sizeof(*pNode) + size);
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosThreadSpinUnlock(&pPool->lock);
return NULL;
}
......@@ -101,7 +102,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
pPool->size = pPool->size + sizeof(*pNode) + size;
}
taosThreadSpinUnlock(&pPool->lock);
return p;
}
......@@ -129,6 +130,12 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
return -1;
}
if (taosThreadSpinInit(&pPool->lock, 0) != 0) {
taosMemoryFree(pPool);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pPool->next = NULL;
pPool->pVnode = pVnode;
pPool->nRef = 0;
......@@ -145,6 +152,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
static int vnodeBufPoolDestroy(SVBufPool *pPool) {
vnodeBufPoolReset(pPool);
taosThreadSpinDestroy(&pPool->lock);
taosMemoryFree(pPool);
return 0;
}
......
......@@ -220,6 +220,10 @@ int vnodeCommit(SVnode *pVnode) {
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
smaAsyncPreCommit(pVnode->pSma);
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
......@@ -237,10 +241,6 @@ int vnodeCommit(SVnode *pVnode) {
}
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
smaAsyncPreCommit(pVnode->pSma);
// commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) {
ASSERT(0);
......
......@@ -424,6 +424,25 @@ int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
return TSDB_CODE_SUCCESS;
}
int32_t vnodeGetStbIdList(SVnode* pVnode, int64_t suid, SArray* list) {
SMStbCursor* pCur = metaOpenStbCursor(pVnode->pMeta, suid);
if (!pCur) {
return TSDB_CODE_FAILED;
}
while (1) {
tb_uid_t id = metaStbCursorNext(pCur);
if (id == 0) {
break;
}
taosArrayPush(list, &id);
}
metaCloseStbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid);
if (!pCur) {
......
......@@ -303,6 +303,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_FETCH_RSMA:
return smaProcessFetch(pVnode->pSma, pMsg);
case TDMT_VND_EXEC_RSMA:
return smaProcessExec(pVnode->pSma, pMsg);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
......@@ -530,7 +532,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
}
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
tdUpdateTbUidList(pVnode->pSma, pStore);
if (tdUpdateTbUidList(pVnode->pSma, pStore) < 0) {
goto _exit;
}
tdUidStoreFree(pStore);
// prepare rsp
......
......@@ -17,7 +17,6 @@
#define TDENGINE_TSIMPLEHASH_H
#include "tarray.h"
#include "tlockfree.h"
#ifdef __cplusplus
extern "C" {
......@@ -27,6 +26,10 @@ typedef uint32_t (*_hash_fn_t)(const char *, uint32_t);
typedef int32_t (*_equal_fn_t)(const void *, const void *, size_t len);
typedef void (*_hash_free_fn_t)(void *);
/**
* @brief single thread hash
*
*/
typedef struct SSHashObj SSHashObj;
/**
......@@ -36,7 +39,7 @@ typedef struct SSHashObj SSHashObj;
* @param fn hash function to generate the hash value
* @return
*/
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t dataLen);
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn);
/**
* return the size of hash table
......@@ -48,22 +51,26 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj);
int32_t tSimpleHashPrint(const SSHashObj *pHashObj);
/**
* put element into hash table, if the element with the same key exists, update it
* @param pHashObj
* @param key
* @param data
* @return
* @brief put element into hash table, if the element with the same key exists, update it
*
* @param pHashObj
* @param key
* @param keyLen
* @param data
* @param dataLen
* @return int32_t
*/
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data);
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen);
/**
* return the payload data with the specified key
*
* @param pHashObj
* @param key
* @param keyLen
* @return
*/
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key);
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen);
/**
* remove item with the specified key
......@@ -71,7 +78,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key);
* @param key
* @param keyLen
*/
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key);
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen);
/**
* Clear the hash table.
......@@ -98,7 +105,7 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj);
* @param keyLen
* @return
*/
void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen);
void *tSimpleHashGetKey(void *data, size_t* keyLen);
/**
* Create the hash table iterator
......@@ -109,7 +116,18 @@ void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen);
*/
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter);
/**
* Create the hash table iterator
*
* @param pHashObj
* @param data
* @param key
* @param iter
* @return void*
*/
void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSIMPLEHASH_H
#endif // TDENGINE_TSIMPLEHASH_H
\ No newline at end of file
......@@ -55,7 +55,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayClear(pInfo->pBlockLists);
if (type == STREAM_INPUT__MERGED_SUBMIT) {
ASSERT(numOfBlocks > 1);
// ASSERT(numOfBlocks > 1);
for (int32_t i = 0; i < numOfBlocks; i++) {
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
taosArrayPush(pInfo->pBlockLists, &pReq);
......
......@@ -14,8 +14,8 @@
*/
#include "tsimplehash.h"
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
......@@ -31,19 +31,21 @@
taosMemoryFreeClear(_n); \
} while (0);
#pragma pack(push, 4)
typedef struct SHNode {
struct SHNode *next;
uint32_t keyLen : 20;
uint32_t dataLen : 12;
char data[];
} SHNode;
#pragma pack(pop)
struct SSHashObj {
SHNode **hashList;
size_t capacity; // number of slots
int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function
int32_t keyLen;
int32_t dataLen;
int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function
};
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
......@@ -54,7 +56,7 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
return i;
}
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t dataLen) {
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
ASSERT(fn != NULL);
if (capacity == 0) {
......@@ -74,8 +76,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t
pHashObj->hashFp = fn;
ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->keyLen = keyLen;
pHashObj->dataLen = dataLen;
pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *));
if (!pHashObj->hashList) {
......@@ -93,40 +93,41 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) {
return (int32_t)atomic_load_64((int64_t *)&pHashObj->size);
}
static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
SHNode *pNewNode = taosMemoryMalloc(sizeof(SHNode) + keyLen + dsize);
static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *data, size_t dataLen, uint32_t hashVal) {
SHNode *pNewNode = taosMemoryMalloc(sizeof(SHNode) + keyLen + dataLen);
if (!pNewNode) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pNewNode->keyLen = keyLen;
pNewNode->dataLen = dataLen;
pNewNode->next = NULL;
memcpy(GET_SHASH_NODE_DATA(pNewNode), pData, dsize);
memcpy(GET_SHASH_NODE_KEY(pNewNode, dsize), key, keyLen);
memcpy(GET_SHASH_NODE_DATA(pNewNode), data, dataLen);
memcpy(GET_SHASH_NODE_KEY(pNewNode, dataLen), key, keyLen);
return pNewNode;
}
static void taosHashTableResize(SSHashObj *pHashObj) {
static void tSimpleHashTableResize(SSHashObj *pHashObj) {
if (!SHASH_NEED_RESIZE(pHashObj)) {
return;
}
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
if (newCapacity > HASH_MAX_CAPACITY) {
// uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached",
// pHashObj->capacity, HASH_MAX_CAPACITY);
uDebug("current capacity:%zu, maximum capacity:%" PRIu64 ", no resize applied due to limitation is reached",
pHashObj->capacity, HASH_MAX_CAPACITY);
return;
}
int64_t st = taosGetTimestampUs();
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
if (!pNewEntryList) {
// qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
uWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
return;
}
size_t inc = newCapacity - pHashObj->capacity;
memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc);
memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc * sizeof(void *));
pHashObj->hashList = pNewEntryList;
pHashObj->capacity = newCapacity;
......@@ -141,8 +142,8 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
SHNode *pPrev = NULL;
while (pNode != NULL) {
void *key = GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen);
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen);
void *key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pNode->keyLen);
int32_t newIdx = HASH_INDEX(hashVal, pHashObj->capacity);
pNext = pNode->next;
......@@ -170,23 +171,23 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
}
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen) {
if (!pHashObj || !key) {
return -1;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen);
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// need the resize process, write lock applied
if (SHASH_NEED_RESIZE(pHashObj)) {
taosHashTableResize(pHashObj);
tSimpleHashTableResize(pHashObj);
}
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHNode *pNode = pHashObj->hashList[slot];
if (!pNode) {
SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal);
SHNode *pNewNode = doCreateHashNode(key, keyLen, data, dataLen, hashVal);
if (!pNewNode) {
return -1;
}
......@@ -197,14 +198,14 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
}
while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
break;
}
pNode = pNode->next;
}
if (!pNode) {
SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal);
SHNode *pNewNode = doCreateHashNode(key, keyLen, data, dataLen, hashVal);
if (!pNewNode) {
return -1;
}
......@@ -212,16 +213,16 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
pHashObj->hashList[slot] = pNewNode;
atomic_add_fetch_64(&pHashObj->size, 1);
} else { // update data
memcpy(GET_SHASH_NODE_DATA(pNode), data, pHashObj->dataLen);
memcpy(GET_SHASH_NODE_DATA(pNode), data, dataLen);
}
return 0;
}
static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, int32_t index) {
static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, size_t keyLen, int32_t index) {
SHNode *pNode = pHashObj->hashList[index];
while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
break;
}
......@@ -233,12 +234,12 @@ static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void
static FORCE_INLINE bool taosHashTableEmpty(const SSHashObj *pHashObj) { return tSimpleHashGetSize(pHashObj) == 0; }
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen) {
if (!pHashObj || taosHashTableEmpty(pHashObj) || !key) {
return NULL;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen);
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHNode *pNode = pHashObj->hashList[slot];
......@@ -247,7 +248,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
}
char *data = NULL;
pNode = doSearchInEntryList(pHashObj, key, slot);
pNode = doSearchInEntryList(pHashObj, key, keyLen, slot);
if (pNode != NULL) {
data = GET_SHASH_NODE_DATA(pNode);
}
......@@ -255,19 +256,19 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
return data;
}
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key) {
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
if (!pHashObj || !key) {
return TSDB_CODE_FAILED;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen);
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHNode *pNode = pHashObj->hashList[slot];
SHNode *pPrev = NULL;
while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
if (!pPrev) {
pHashObj->hashList[slot] = pNode->next;
} else {
......@@ -312,6 +313,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) {
tSimpleHashClear(pHashObj);
taosMemoryFreeClear(pHashObj->hashList);
taosMemoryFree(pHashObj);
}
size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
......@@ -322,26 +324,54 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj);
}
void *tSimpleHashGetKey(const SSHashObj *pHashObj, void *data, size_t *keyLen) {
#if 0
int32_t offset = offsetof(SHNode, data);
SHNode *node = ((SHNode *)(char *)data - offset);
void *tSimpleHashGetKey(void *data, size_t *keyLen) {
SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data));
if (keyLen) {
*keyLen = pHashObj->keyLen;
*keyLen = node->keyLen;
}
return POINTER_SHIFT(data, pHashObj->dataLen);
return POINTER_SHIFT(data, node->dataLen);
}
return GET_SHASH_NODE_KEY(node, pHashObj->dataLen);
#endif
if (keyLen) {
*keyLen = pHashObj->keyLen;
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
if (!pHashObj) {
return NULL;
}
return POINTER_SHIFT(data, pHashObj->dataLen);
SHNode *pNode = NULL;
if (!data) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i];
if (!pNode) {
continue;
}
*iter = i;
return GET_SHASH_NODE_DATA(pNode);
}
return NULL;
}
pNode = (SHNode *)((char *)data - offsetof(SHNode, data));
if (pNode->next) {
return GET_SHASH_NODE_DATA(pNode->next);
}
++(*iter);
for (int32_t i = *iter; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i];
if (!pNode) {
continue;
}
*iter = i;
return GET_SHASH_NODE_DATA(pNode);
}
return NULL;
}
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter) {
if (!pHashObj) {
return NULL;
}
......@@ -355,6 +385,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
continue;
}
*iter = i;
if (key) {
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
}
return GET_SHASH_NODE_DATA(pNode);
}
return NULL;
......@@ -363,6 +396,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
pNode = (SHNode *)((char *)data - offsetof(SHNode, data));
if (pNode->next) {
if (key) {
*key = GET_SHASH_NODE_KEY(pNode->next, pNode->next->dataLen);
}
return GET_SHASH_NODE_DATA(pNode->next);
}
......@@ -373,6 +409,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
continue;
}
*iter = i;
if (key) {
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
}
return GET_SHASH_NODE_DATA(pNode);
}
......
......@@ -32,31 +32,33 @@
TEST(testCase, tSimpleHashTest) {
SSHashObj *pHashObj =
tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), sizeof(int64_t), sizeof(int64_t));
tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
assert(pHashObj != nullptr);
ASSERT_EQ(0, tSimpleHashGetSize(pHashObj));
size_t keyLen = sizeof(int64_t);
size_t dataLen = sizeof(int64_t);
int64_t originKeySum = 0;
for (int64_t i = 1; i <= 100; ++i) {
originKeySum += i;
tSimpleHashPut(pHashObj, (const void *)&i, (const void *)&i);
tSimpleHashPut(pHashObj, (const void *)&i, keyLen, (const void *)&i, dataLen);
ASSERT_EQ(i, tSimpleHashGetSize(pHashObj));
}
for (int64_t i = 1; i <= 100; ++i) {
void *data = tSimpleHashGet(pHashObj, (const void *)&i);
void *data = tSimpleHashGet(pHashObj, (const void *)&i, keyLen);
ASSERT_EQ(i, *(int64_t *)data);
}
void *data = NULL;
int32_t iter = 0;
int64_t keySum = 0;
int64_t dataSum = 0;
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
void *key = tSimpleHashGetKey(pHashObj, data, NULL);
void *key = tSimpleHashGetKey(data, NULL);
keySum += *(int64_t *)key;
dataSum += *(int64_t *)data;
}
......@@ -65,7 +67,7 @@ TEST(testCase, tSimpleHashTest) {
ASSERT_EQ(keySum, originKeySum);
for (int64_t i = 1; i <= 100; ++i) {
tSimpleHashRemove(pHashObj, (const void *)&i);
tSimpleHashRemove(pHashObj, (const void *)&i, keyLen);
ASSERT_EQ(100 - i, tSimpleHashGetSize(pHashObj));
}
......
......@@ -617,6 +617,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema")
//index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册