From 3e0e7a87be6075cefb04dc6b212cdaf46a7a584c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 23 Sep 2022 14:30:47 +0800 Subject: [PATCH] fix(tmq): save ntb check to another tb --- source/dnode/vnode/src/tq/tqMeta.c | 31 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 62f8debccb..c55e1059cf 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -25,17 +25,17 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1; - } else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){ + } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid); if (tEncodeI32(pEncoder, size) < 0) return -1; - void *pIter = NULL; + void* pIter = NULL; pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter); - while(pIter){ - int64_t *tbUid = (int64_t *)taosHashGetKey(pIter, NULL); + while (pIter) { + int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL); if (tEncodeI64(pEncoder, *tbUid) < 0) return -1; pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter); } - } else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){ + } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1; } tEndEncode(pEncoder); @@ -52,17 +52,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1; - }else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){ + } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); int32_t size = 0; if (tDecodeI32(pDecoder, &size) < 0) return -1; - for(int32_t i = 0; i < size; i++){ + for (int32_t i = 0; i < size; i++) { int64_t tbUid = 0; if (tDecodeI64(pDecoder, &tbUid) < 0) return -1; taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); } - } else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){ + } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1; } tEndDecode(pDecoder); @@ -117,7 +117,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_ return -1; } - if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) { + if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, &txn) < 0) { return -1; } @@ -284,7 +284,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { }; if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - handle.execHandle.task = qCreateQueueExecTaskInfo( handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); ASSERT(handle.execHandle.task); @@ -297,9 +296,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); - buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); - handle.execHandle.task = - qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); + buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, + (SSnapContext**)(&reader.sContext)); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); @@ -314,9 +313,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList); taosArrayDestroy(tbUidList); - buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); - handle.execHandle.task = - qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); + buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, + handle.fetchMeta, (SSnapContext**)(&reader.sContext)); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); } tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); -- GitLab