diff --git a/include/util/tarray.h b/include/util/tarray.h index a8510e4bc8295e7e40092ea5ae687717bda20676..278f9f6baba4fcf8e8d8b7a233228b96e4921377 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -69,14 +69,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles); */ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)); -/** - * - * @param pArray - * @param comparFn - * @param fp - */ -void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)); - /** * add all element from the source array list into the destination * @param pArray @@ -252,14 +244,6 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode); void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz); -/** - * swap array - * @param a - * @param b - * @return - */ -void taosArraySwap(SArray* a, SArray* b); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 2f560910d430048a653aa0209c1aeb1e6e3a56bb..616f69cd5ba7598c2aa8f9e7afd486011962e908 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -601,6 +601,13 @@ static void* topicNameDup(void* p){ return taosStrdup((char*) p); } +static void freeItem(void* param) { + void* pItem = *(void**)param; + if (pItem != NULL) { + taosMemoryFree(pItem); + } +} + int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; char *msgStr = pMsg->pCont; @@ -616,7 +623,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t code = -1; SArray *pTopicList = subscribe.topicNames; taosArraySort(pTopicList, taosArrayCompareString); - taosArrayRemoveDuplicateP(pTopicList, taosArrayCompareString, taosMemoryFree); + taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem); int32_t newTopicNum = taosArrayGetSize(pTopicList); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index de2732fcb550f1332c48ec58521bb708c8a33618..5210a8cc66bced3f5bdc9f7f28dc3942f62b2e5a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -885,6 +885,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); + // kill executing task + qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + if (pTaskInfo != NULL) { +// qAsyncKillTask(pTaskInfo); + } + taosWLockLatch(&pTq->lock); atomic_store_32(&pHandle->epoch, -1); @@ -895,7 +901,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_add_fetch_32(&pHandle->epoch, 1); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - qStreamCloseTsdbReader(pHandle->execHandle.task); + qStreamCloseTsdbReader(pTaskInfo); + } + + // reset the error code. + if (pHandle->execHandle.task != NULL) { + } taosWUnLockLatch(&pTq->lock); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index a62101eb4702442a580b2bfee33643106df41767..ce86a6757e5388d8895ea31dd4dddb6b4c01a2ed 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -65,16 +65,17 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; + int32_t vgId = TD_VID(pTq->pVnode); if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return"); + tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); if (pOffset->type == TMQ_OFFSET__LOG) { pRsp->rspOffset = *pOffset; return 0; } else { tqOffsetResetToLog(pOffset, pHandle->snapshotVer); if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return"); + tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); pRsp->rspOffset = *pOffset; return 0; } @@ -86,13 +87,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId); + tqDebug("vgId:%d, tmq task start to execute, consumer:0x%"PRIx64, vgId, pHandle->consumerId); if (qExecTask(task, &pDataBlock, &ts) < 0) { - tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr()); + tqError("vgId:%d, task exec error since %s, consumer:0x%" PRIx64, vgId, terrstr(), + pHandle->consumerId); return -1; } - tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock); + tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, vgId, pDataBlock); // current scan should be stopped asap, since the rebalance occurs. if (pDataBlock == NULL) { @@ -115,15 +117,16 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs } if (pRsp->rspOffset.type == 0) { - tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts, - pRsp->rspOffset.uid, pRsp->rspOffset.version); + tqError("vgId:%d, expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, vgId, pRsp->rspOffset.type, + pRsp->rspOffset.ts, pRsp->rspOffset.uid, pRsp->rspOffset.version); return -1; } if (pRsp->withTbName || pRsp->withSchema) { - tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema); + tqError("vgId:%d, get column should not with meta:%d,%d", vgId, pRsp->withTbName, pRsp->withSchema); return -1; } + return 0; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 67174c3267124ec380cd9e55d388fa9071b474df..24a26d575a0c99a127afdda407b76b203054d13a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -633,7 +633,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB } } -bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code) ? true : false; } +bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code);} void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; } diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 7467fa29481d932632b612fc44550fd024542fc3..6c1d3e17bb6b05786629bd1d2ec02588a9d083a5 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -140,46 +140,7 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp) taosArraySet(pArray, pos + 1, p2); memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize); - pos += 1; - } else { pos += 1; - } - } - } - - if (fp != NULL) { - for (int32_t i = pos + 1; i < pArray->size; ++i) { - void* p = taosArrayGet(pArray, i); - fp(p); - } - } - - pArray->size = pos + 1; -} - -void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) { - size_t size = pArray->size; - if (size <= 1) { - return; - } - - int32_t pos = 0; - for (int32_t i = 1; i < size; ++i) { - char* p1 = taosArrayGet(pArray, pos); - char* p2 = taosArrayGet(pArray, i); - - if (comparFn(p1, p2) == 0) { - // do nothing - } else { - if (pos + 1 != i) { - void* p = taosArrayGetP(pArray, pos + 1); - if (fp != NULL) { - fp(p); - } - - taosArraySet(pArray, pos + 1, p2); - memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize); - pos += 1; } else { pos += 1; } @@ -188,7 +149,7 @@ void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp if (fp != NULL) { for (int32_t i = pos + 1; i < pArray->size; ++i) { - void* p = taosArrayGetP(pArray, i); + void* p = taosArrayGet(pArray, i); fp(p); } } @@ -392,20 +353,6 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) { pArray->size = 0; } -void taosArrayClearP(SArray* pArray, FDelete fp) { - if (pArray == NULL) return; - if (fp == NULL) { - pArray->size = 0; - return; - } - - for (int32_t i = 0; i < pArray->size; ++i) { - fp(*(void**)TARRAY_GET_ELEM(pArray, i)); - } - - pArray->size = 0; -} - void* taosArrayDestroy(SArray* pArray) { if (pArray) { taosMemoryFree(pArray->pData); @@ -495,6 +442,7 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void if (pArray->size <= 1) { return; } + for (int32_t i = 1; i <= pArray->size - 1; ++i) { for (int32_t j = i; j > 0; --j) { if (fn(taosArrayGetP(pArray, j), taosArrayGetP(pArray, j - 1), param) == -1) { @@ -507,7 +455,6 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void } } } - return; } int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode) { @@ -539,21 +486,3 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param); } -void taosArraySwap(SArray* a, SArray* b) { - if (a == NULL || b == NULL) return; - size_t t = a->size; - a->size = b->size; - b->size = t; - - uint32_t cap = a->capacity; - a->capacity = b->capacity; - b->capacity = cap; - - uint32_t elem = a->elemSize; - a->elemSize = b->elemSize; - b->elemSize = elem; - - void* data = a->pData; - a->pData = b->pData; - b->pData = data; -}