未验证 提交 8b57184a 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21123 from taosdata/fix/liaohj_main

refactor: remove assert.
......@@ -208,8 +208,6 @@ void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem);
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo);
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
......
......@@ -122,10 +122,8 @@ int32_t tNameLen(const SName* name) {
int32_t len2 = (int32_t)strlen(name->tname);
if (name->type == TSDB_DB_NAME_T) {
ASSERT(len2 == 0);
return len + len1 + TSDB_NAME_DELIMITER_LEN;
} else {
ASSERT(len2 > 0);
return len + len1 + len2 + TSDB_NAME_DELIMITER_LEN * 2;
}
}
......
......@@ -555,7 +555,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, sync restore finished, start to restore stream tasks by replay wal", pVnode->config.vgId);
// start to restore all stream tasks
tqStartStreamTasks(pVnode->pTq);
if (tsDisableStream) {
vInfo("vgId:%d, not restore stream tasks, since disabled", pVnode->config.vgId);
} else {
vInfo("vgId:%d start to restore stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq);
}
}
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
......
......@@ -139,7 +139,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(numOfBlocks == 1);
taosArrayPush(pInfo->pBlockLists, input);
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_BLOCK) {
......@@ -313,7 +312,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo);
terrno = code;
return NULL;
......@@ -801,7 +799,11 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) {
return;
}
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
if (pTaskInfo->pRoot != NULL) {
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
} else {
qDebug("%s execTask completed", GET_TASKID(pTaskInfo));
}
printTaskExecCostInLog(pTaskInfo); // print the query cost summary
doDestroyTask(pTaskInfo);
......@@ -854,15 +856,6 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
}
}
#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
return 0;
}
#endif
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
......@@ -897,8 +890,7 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0);
qInfo("save stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
......@@ -914,9 +906,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0);
qInfo("save stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
......@@ -929,8 +920,7 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0);
qInfo("save stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
......@@ -991,7 +981,6 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
if (pOperator->numOfDownstream > 1) {
qError("unexpected stream, multiple downstream");
/*ASSERT(0);*/
return -1;
}
return 0;
......
......@@ -99,6 +99,7 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
if (NULL == (*pTaskInfo)->pRoot) {
int32_t code = (*pTaskInfo)->code;
doDestroyTask(*pTaskInfo);
(*pTaskInfo) = NULL;
return code;
} else {
return TSDB_CODE_SUCCESS;
......@@ -206,11 +207,14 @@ static void freeBlock(void* pParam) {
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
destroyOperator(pTaskInfo->pRoot);
pTaskInfo->pRoot = NULL;
cleanupQueriedTableScanInfo(&pTaskInfo->schemaInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo);
if (!pTaskInfo->localFetch.localExec) {
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
pTaskInfo->pSubplan = NULL;
}
taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock);
......
......@@ -255,7 +255,10 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) {
}
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
ASSERT(cnt <= pArray->size);
if (cnt > pArray->size) {
cnt = pArray->size;
}
pArray->size = pArray->size - cnt;
if (pArray->size == 0 || cnt == 0) {
return;
......
......@@ -264,7 +264,6 @@ static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
static void removeNodeInEntryList(SCacheEntry *pe, SCacheNode *prev, SCacheNode *pNode) {
if (prev == NULL) {
ASSERT(pe->next == pNode);
pe->next = pNode->pNext;
} else {
prev->pNext = pNode->pNext;
......@@ -464,7 +463,6 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
SCacheNode *pNode = doSearchInEntryList(pe, key, keyLen, &prev);
if (pNode != NULL) {
int32_t ref = T_REF_INC(pNode);
ASSERT(ref > 0);
}
taosRUnLockLatch(&pe->latch);
......@@ -607,7 +605,6 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
pNode->data, ref);
if (ref > 0) {
ASSERT(pNode->pTNodeHeader == NULL);
taosAddToTrashcan(pCacheObj, pNode);
} else { // ref == 0
atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
......@@ -916,7 +913,6 @@ void taosStopCacheRefreshWorker(void) {
size_t taosCacheGetNumOfObj(const SCacheObj *pCacheObj) { return pCacheObj->numOfElems + pCacheObj->numOfElemsInTrash; }
SCacheIter *taosCacheCreateIter(const SCacheObj *pCacheObj) {
ASSERT(pCacheObj != NULL);
SCacheIter *pIter = taosMemoryCalloc(1, sizeof(SCacheIter));
pIter->pCacheObj = (SCacheObj *)pCacheObj;
pIter->entryIndex = -1;
......@@ -966,12 +962,8 @@ bool taosCacheIterNext(SCacheIter *pIter) {
SCacheNode *pNode = pEntry->next;
for (int32_t i = 0; i < pEntry->num; ++i) {
ASSERT(pNode != NULL);
pIter->pCurrent[i] = pNode;
int32_t ref = T_REF_INC(pIter->pCurrent[i]);
ASSERT(ref >= 1);
pNode = pNode->pNext;
}
......
......@@ -259,8 +259,6 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
pHashObj->freeFp = NULL;
pHashObj->callbackFp = NULL;
ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->hashList = (SHashEntry **)taosMemoryMalloc(pHashObj->capacity * sizeof(void *));
if (pHashObj->hashList == NULL) {
taosMemoryFree(pHashObj);
......@@ -343,7 +341,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
while (pNode) {
if ((pNode->keyLen == keyLen) && (*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0 &&
pNode->removed == 0) {
ASSERT(pNode->hashVal == hashVal);
break;
}
......@@ -701,8 +698,6 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode) {
pNode->next = pEntry->next;
pEntry->next = pNode;
ASSERT(pNode->next != pNode);
pEntry->num += 1;
}
......@@ -816,19 +811,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
/*uint16_t prevRef = atomic_load_16(&pNode->refCount);*/
uint16_t afterRef = atomic_add_fetch_16(&pNode->refCount, 1);
#if 0
ASSERT(prevRef < afterRef);
// the reference count value is overflow, which will cause the delete node operation immediately.
if (prevRef > afterRef) {
uError("hash entry ref count overflow, prev ref:%d, current ref:%d", prevRef, afterRef);
// restore the value
atomic_sub_fetch_16(&pNode->refCount, 1);
data = NULL;
} else {
data = GET_HASH_NODE_DATA(pNode);
}
#endif
data = GET_HASH_NODE_DATA(pNode);
if (afterRef >= MAX_WARNING_REF_COUNT) {
......
......@@ -115,8 +115,6 @@ void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
}
void tMergeTreeRebuild(SMultiwayMergeTreeInfo* pTree) {
ASSERT((pTree->totalSources & 0x1) == 0);
tMergeTreeInit(pTree);
for (int32_t i = pTree->totalSources - 1; i >= pTree->numOfSources; i--) {
tMergeTreeAdjust(pTree, i);
......
......@@ -137,7 +137,6 @@ void *taosProcessSchedQueue(void *scheduler) {
while (1) {
if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
ASSERT(0);
}
if (atomic_load_8(&pSched->stop)) {
break;
......@@ -145,7 +144,6 @@ void *taosProcessSchedQueue(void *scheduler) {
if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
ASSERT(0);
}
msg = pSched->queue[pSched->fullSlot];
......@@ -154,12 +152,10 @@ void *taosProcessSchedQueue(void *scheduler) {
if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
ASSERT(0);
}
if ((ret = tsem_post(&pSched->emptySem)) != 0) {
uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno));
ASSERT(0);
}
if (msg.fp)
......@@ -187,12 +183,10 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
ASSERT(0);
}
if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
ASSERT(0);
}
pSched->queue[pSched->emptySlot] = *pMsg;
......@@ -200,12 +194,10 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
ASSERT(0);
}
if ((ret = tsem_post(&pSched->fullSem)) != 0) {
uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno));
ASSERT(0);
}
return ret;
}
......
......@@ -268,8 +268,9 @@ SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) {
}
SSkipListIterator *tSkipListCreateIterFromVal(SSkipList *pSkipList, const char *val, int32_t type, int32_t order) {
ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
ASSERT(pSkipList != NULL);
if (order != TSDB_ORDER_ASC && order != TSDB_ORDER_DESC) {
return NULL;
}
SSkipListIterator *iter = doCreateSkipListIterator(pSkipList, order);
if (val == NULL) {
......@@ -585,7 +586,6 @@ static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) {
}
}
ASSERT(level <= pSkipList->maxLevel);
return level;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册