diff --git a/src/client/src/tscDelete.c b/src/client/src/tscDelete.c index d090803cbf8273b4136da8a5a47e0e72854a1148..b5d044d9dba7d98763666ee6bde7ba5de4d41ba3 100644 --- a/src/client/src/tscDelete.c +++ b/src/client/src/tscDelete.c @@ -23,7 +23,7 @@ #include "tscSubquery.h" -void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo); +void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgroupInfo); // // handle error @@ -85,6 +85,7 @@ void tscSubDeleteCallback(void *param, TAOS_RES *tres, int code) { return; } + // record tscInfo("0x%"PRIx64":CDEL sub:0x%"PRIx64" query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve row(s)=%d tables(s)=%d", trsupport->pParentSql->self, pSql->self, pVgroup->epAddr[pSql->epSet.inUse].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pSql->res.numOfRows, pSql->res.numOfTables); @@ -156,8 +157,9 @@ SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrie registerSqlObj(pNew); tscDebug("0x%"PRIx64":CDEL new sub insertion: %p", pSql->self, pNew); - // set vnode epset - tscSetDnodeEpSet(&pNew->epSet, pVgroupMsg); + SNewVgroupInfo vgroupInfo = {0}; + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pVgroupMsg->vgId, sizeof(pVgroupMsg->vgId), NULL, &vgroupInfo); + tscDumpEpSetFromVgroupInfo(&pNew->epSet, &vgroupInfo); return pNew; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1131e84f7d9682ce668cf72a290847359dd392b2..67c7d2b1e8d112f98a66fb72b61d82c41ee11fe5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -73,7 +73,7 @@ static int32_t removeDupVgid(int32_t *src, int32_t sz) { return ret; } -void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) { +static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) { assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); // Issue the query to one of the vnode among a vgroup randomly. @@ -130,7 +130,7 @@ void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) { taosCorEndWrite(&pCorEpSet->version); } -static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgroupInfo) { +void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgroupInfo) { if (pVgroupInfo == NULL) { return;} int8_t inUse = pVgroupInfo->inUse; pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 27afb08aa49eb62bdd0f5045195d1f310b4853f1..45f0b353de7863f9d3df50cba3a7684e113fbe5f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3332,6 +3332,9 @@ bool tscShouldBeFreed(SSqlObj* pSql) { STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd* pCmd, int32_t tableIndex) { assert(pCmd != NULL); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); + if(pQueryInfo == NULL) { + return NULL; + } return tscGetMetaInfo(pQueryInfo, tableIndex); } diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 58d21aa1b792c61aa37b800e23f74abe97898949..a9d648793d098bfb7cad05a4ca8d034fe86c1399 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -164,10 +164,19 @@ void dnodeFreeVWriteQueue(void *pWqueue) { void* waitingResultThread(void* param) { SVWriteMsg* pWrite = (SVWriteMsg* )param; + + // wait AddWaitThread to list finished + dDebug(":SDEL pVnode:%p wait AddWaitThread finished... pWrite=%p", pWrite->pVnode, pWrite); + tsem_t* psem = vnodeSemWait(pWrite->pVnode); + tsem_wait(psem); + tsem_post(psem); + dDebug(":SDEL pVnode:%p wait AddWaitThread ok pWrite=%p", pWrite->pVnode, pWrite); + + // wait request deal finished int32_t ret = tsem_wait(pWrite->rspRet.psem); + dDebug(":SDEL pVnode:%p wait request ok pWrite=%p", pWrite->pVnode, pWrite); if(ret == 0) { // success - } tsem_destroy(pWrite->rspRet.psem); tfree(pWrite->rspRet.psem); @@ -196,21 +205,33 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) { if (count <= 1) return; - if(pWrite->rspRet.psem == 0) { - SRpcMsg rpcRsp = { - .handle = pWrite->rpcMsg.handle, - .pCont = pWrite->rspRet.rsp, - .contLen = pWrite->rspRet.len, - .code = pWrite->code, - }; + SRpcMsg rpcRsp = { + .handle = pWrite->rpcMsg.handle, + .pCont = pWrite->rspRet.rsp, + .contLen = pWrite->rspRet.len, + .code = pWrite->code, + }; + if(pWrite->rspRet.psem == 0) { + // no wait response rpcSendResponse(&rpcRsp); vnodeFreeFromWQueue(pVnode, pWrite); } else { - // need async to wait result in another thread - pthread_t* thread = taosCreateThread(waitingResultThread, pWrite); - // add to wait thread manager - vnodeAddWait(pVnode, thread, pWrite->rspRet.psem, pWrite); + if (vnodeWaitTooMany(pVnode)) { + // too many wait , so can not wait again + rpcRsp.code = TSDB_CODE_APP_NOT_READY; + rpcSendResponse(&rpcRsp); + vnodeFreeFromWQueue(pVnode, pWrite); + } else { + tsem_t* psem = vnodeSemWait(pVnode); + tsem_wait(psem); + // need async to wait result in another thread + pthread_t* thread = taosCreateThread(waitingResultThread, pWrite); + // add to wait thread manager + vnodeAddWait(pVnode, thread, pWrite->rspRet.psem, pWrite); + dDebug(":SDEL pVnode=%p vnode add wait %p ok, tsem_post.", pVnode, pWrite); + tsem_post(psem); + } } } diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 54c34f5d8abde6575c0aa1e41df910ef7043c4b5..6266926d55f2e7057732aa9fc882791479b19098 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -249,6 +249,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied" #define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing" #define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state" +#define TSDB_CODE_WAIT_THREAD_TOO_MANY TAOS_DEF_ERROR_CODE(0, 0x0515) //"Wait threads too many" // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID") diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 6744c681c727dda41120862e16754cc6afa0d45b..41f3eca08bdebc63c0c6eda96c95774334f09074 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -61,9 +61,9 @@ typedef struct { typedef struct { int32_t startTime; - pthread_t* pthread; - tsem_t* psem; - void* param; + pthread_t * pthread; + tsem_t * psem; + void * param; } SWaitThread; // vnodeStatus @@ -106,7 +106,10 @@ int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); // wait thread void vnodeAddWait(void* pVnode, pthread_t* pthread, tsem_t* psem, void* param); -void vnodeRemoveWait(void* pVnode, void* param); +void vnodeRemoveWait(void* pVnode, void* param); +// get wait thread count +bool vnodeWaitTooMany(void* vparam); +tsem_t* vnodeSemWait(void* vparam); #ifdef __cplusplus } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5dad98840b88355d970a967c9615c481c972fc39..623547b130edfdebc6b256967b7ff380e6051165 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2389,6 +2389,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap); pRuntimeEnv->pTableRetrieveTsMap = NULL; + + taosHashCleanup(pRuntimeEnv->pTablesRead); + pRuntimeEnv->pTablesRead = NULL; taosHashCleanup(pRuntimeEnv->pResultRowListSet); pRuntimeEnv->pResultRowListSet = NULL; diff --git a/src/query/src/qSqlParser.c b/src/query/src/qSqlParser.c index 2a79bfb434358b328936702f6d41185d62c136e4..b360fa95fc7c72c9c002c3d75e6fdc4296c0469a 100644 --- a/src/query/src/qSqlParser.c +++ b/src/query/src/qSqlParser.c @@ -1212,6 +1212,9 @@ void SqlInfoDestroy(SSqlInfo *pInfo) { taosArrayDestroy(&pInfo->funcs); if (pInfo->type == TSDB_SQL_SELECT) { destroyAllSqlNode(pInfo->list); + } else if (pInfo->type == TSDB_SQL_DELETE_DATA) { + tSqlExprDestroy(pInfo->pDelData->pWhere); + tfree(pInfo->pDelData); } else if (pInfo->type == TSDB_SQL_CREATE_TABLE) { pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo); } else if (pInfo->type == TSDB_SQL_ALTER_TABLE) { diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 47ee0d8e229c3b4e692f4472e7f15cbd72ceb40e..95931fcbc6f46bac1e535e4684750ab1874e8f0c 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -142,6 +142,7 @@ static int32_t tsRpcNum = 0; #define RPC_CONN_UDPC 1 #define RPC_CONN_TCPS 2 #define RPC_CONN_TCPC 3 +#define RPC_CONN_AUTO 4 // need tcp use tcp void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { taosInitUdpConnection, @@ -405,7 +406,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64 // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection char type = pMsg->msgType; - if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE + if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE || type == TSDB_MSG_TYPE_SUBMIT || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_TABLE_META || type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_DM_STATUS || type == TSDB_MSG_TYPE_CM_ALTER_TABLE) diff --git a/src/tsdb/inc/tsdbCommit.h b/src/tsdb/inc/tsdbCommit.h index 2b1b466d8f8fa04a848c167873b690a55b0c8961..19304af74c1480950a46f7db3878e17a819421af 100644 --- a/src/tsdb/inc/tsdbCommit.h +++ b/src/tsdb/inc/tsdbCommit.h @@ -58,11 +58,20 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { } static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) { +int64_t fid; if (key < 0) { - return (int)((key + 1) / tsTickPerDay[precision] / days - 1); + fid = ((key + 1) / tsTickPerDay[precision] / days - 1); } else { - return (int)((key / tsTickPerDay[precision] / days)); + fid = ((key / tsTickPerDay[precision] / days)); } + + // check fid over int max or min, set with int max or min + if (fid > INT32_MAX) { + fid = INT32_MAX; + } else if(fid < INT32_MIN){ + fid = INT32_MIN; + } + return (int)fid; } #endif /* _TD_TSDB_COMMIT_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 9cdb8a83aa266d04d91e07d515f0acb56703f880..13c8c506394dbea3f59b6af318d2b80a1d09adf7 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -162,7 +162,7 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { } static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) { - ASSERT((pTable->lastRow == NULL) || (pTable->lastKey == memRowKey(pTable->lastRow))); + ASSERT((pTable->lastRow == NULL) || (pTable->lastKey == TSKEY_INITIAL_VAL) || (pTable->lastKey == memRowKey(pTable->lastRow))); return pTable->lastKey; } diff --git a/src/tsdb/src/tsdbDelete.c b/src/tsdb/src/tsdbDelete.c index 702f46d13ea78b0ec4112ce16e3a66387e761378..180f577db2e87b0950a9c2ac20ce41e1a42a778b 100644 --- a/src/tsdb/src/tsdbDelete.c +++ b/src/tsdb/src/tsdbDelete.c @@ -208,6 +208,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray int sFid = TSDB_KEY_FID(win.skey, pCfg->daysPerFile, pCfg->precision); int eFid = TSDB_KEY_FID(win.ekey, pCfg->daysPerFile, pCfg->precision); if(sFid > eFid) { + tsdbError("vgId:%d :SDEL sFid > eFid no fid to delete. sFid=%d eFid=%d", REPO_ID(pRepo), sFid, eFid); tsdbDestroyDeleteH(&deleteH); return -1; } @@ -357,10 +358,12 @@ static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo) { tsdbFSIterInit(&(pdh->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); if (tsdbInitReadH(&(pdh->readh), pRepo) < 0) { + tsdbError("vgId:%d :SDEL tsdbInitReadH return -1. malloc memory failed.", REPO_ID(pRepo)); return -1; } if (tsdbInitDeleteTblArray(pdh) < 0) { + tsdbError("vgId:%d :SDEL tsdbInitDeleteTblArray return -1. maybe malloc memory failed or lock meta error.", REPO_ID(pRepo)); tsdbDestroyDeleteH(pdh); return -1; } @@ -666,6 +669,8 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) { // get pSchema for del table if ((pSchema = tsdbGetTableSchemaImpl(pItem->pTable, true, true, -1, -1)) == NULL) { + tsdbError("vgId:%d :SDEL tsdbGetTableSchemaImpl return NULL tid=%d. errno=%d (%s)", REPO_ID(pdh->pRepo), + pItem->pTable->tableId.tid, errno, tstrerror(terrno)); return -1; } @@ -700,6 +705,7 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) { // border block need load to delete no-use data if (tsdbLoadBlockData(pReadh, pBlock, pItem->pInfo) < 0) { + tsdbError("vgId:%d :SDEL tsdbLoadBlockData return -1. i=%d. errno=%d (%s)", REPO_ID(pdh->pRepo), i, errno, tstrerror(terrno)); return -1; } @@ -710,6 +716,7 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) { SBlock newBlock = {0}; if (tsdbWriteBlockToFile(pdh, pItem->pTable, pdh->pDCols, ppBuf, ppCBuf, ppExBuf, &newBlock) < 0) { + tsdbError("vgId:%d :SDEL tsdbWriteBlockToFile return -1. i=%d. errno=%d (%s)", REPO_ID(pdh->pRepo), i, errno, tstrerror(terrno)); return -1; } @@ -720,6 +727,8 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) { // write block info for each table if (tsdbWriteBlockInfoImpl(TSDB_DELETE_HEAD_FILE(pdh), pItem->pTable, pdh->aSupBlk, pdh->aSubBlk, ppBuf, &blkIdx) < 0) { + tsdbError("vgId:%d :SDEL tsdbWriteBlockInfoImpl return -1. tid=%d. errno=%d (%s)", REPO_ID(pdh->pRepo), + pItem->pTable->tableId.tid, errno, tstrerror(terrno)); return -1; } @@ -797,6 +806,7 @@ static int tsdbFSetDeleteImpl(SDeleteH *pdh) { // 3.WRITE INDEX OF ALL TABLE'S BLOCK TO HEAD FILE if (tsdbWriteBlockIdx(TSDB_DELETE_HEAD_FILE(pdh), pdh->aBlkIdx, ppBuf) < 0) { + tsdbError("vgId:%d :SDEL tsdbWriteBlockIdx return -1. errno=%d (%s)", REPO_ID(pdh->pRepo), terrno, tstrerror(terrno)); return -1; } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index bc26be5828eee04198c98bdc26b5bad95cc9d181..2ae215ad36288349c41d18ad000823063da37805 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -855,6 +855,8 @@ int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockI } else { pTable->lastRow = lastRow; } + } else { + taosTZfree(lastRow); } TSDB_WUNLOCK_TABLE(pTable); diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index f5b92b018dabcabb228073eb7269c0ac81f44d6a..bf34c1e88d30f2259ea63ab71c46930fab2a75f7 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -1165,12 +1165,16 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit ret = tsdbQuerySTableByTagCond(pRepo, pBlock->uid, pCtlData->win.skey, pCtlData->tagCond, pCtlData->tagCondLen, &tableGroupInfo, NULL, 0); if (ret != TSDB_CODE_SUCCESS) { tsdbError(":SDEL vgId:%d failed to get child tables id from stable with tag condition. uid=%" PRIu64, REPO_ID(pRepo), pBlock->uid); + if(tableGroupInfo.pGroupList) + tsdbDestroyTableGroup(&tableGroupInfo); return ret; } tnum = tsdbTableGroupInfo(&tableGroupInfo, NULL); if (tnum == 0) { tsdbWarn(":SDEL vgId:%d super table no child tables after filter by tag. uid=%" PRIu64, REPO_ID(pRepo), pBlock->uid); + if(tableGroupInfo.pGroupList) + tsdbDestroyTableGroup(&tableGroupInfo); return TSDB_CODE_SUCCESS; } } else { @@ -1178,11 +1182,13 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit tnum = 1; } - // INIT SEM FOR ASYNC WAIT COMMIT RESULT - if (ppSem) { + // if need response (pRsp not null) , malloc ppSem for async wait response + if (ppSem && pRsp) { *ppSem = (tsem_t* )tmalloc(sizeof(tsem_t)); ret = tsem_init(*ppSem, 0, 0); if(ret != 0) { + if(tableGroupInfo.pGroupList) + tsdbDestroyTableGroup(&tableGroupInfo); return TAOS_SYSTEM_ERROR(ret); } } @@ -1219,6 +1225,9 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit } tfree(pNew); } + + if(tableGroupInfo.pGroupList) + tsdbDestroyTableGroup(&tableGroupInfo); return ret; } \ No newline at end of file diff --git a/src/util/inc/tqueue.h b/src/util/inc/tqueue.h index c21748447d2fbc16b6d10a5c8531916e25e02624..c3051464e556860178be36f3473f5e4686f6082e 100644 --- a/src/util/inc/tqueue.h +++ b/src/util/inc/tqueue.h @@ -47,8 +47,6 @@ void *taosAllocateQitem(int size); void taosFreeQitem(void *item); int taosWriteQitem(taos_queue, int type, void *item); int taosReadQitem(taos_queue, int *type, void **pitem); -// special type search Qitem -int taosSearchQitem(taos_queue, int type, void **pitem); taos_qall taosAllocateQall(); void taosFreeQall(taos_qall); diff --git a/src/util/src/terror.c b/src/util/src/terror.c index cee9245ec79abce96d8990e179cfd457791bf318..334207022d61fdcf80f26fe626edcb7de9628944 100644 --- a/src/util/src/terror.c +++ b/src/util/src/terror.c @@ -256,6 +256,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, "Database is syncing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TSDB_STATE, "Invalid tsdb state") +TAOS_DEFINE_ERROR(TSDB_CODE_WAIT_THREAD_TOO_MANY, "Wait threads too many") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 0c361ad42d7f270a61f842680d12bcd921867c7b..1ffa94b0df6b63dac914649c7003d37bbedbdb24 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -160,47 +160,6 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) { return code; } -// search Qitem with type -int taosSearchQitem(taos_queue param, int type, void **pitem) { - STaosQueue *queue = (STaosQueue *)param; - STaosQnode *pNode = NULL; - STaosQnode *pPre = NULL; - int code = 0; - - pthread_mutex_lock(&queue->mutex); - - pNode = queue->head; - while (pNode) { - if(pNode->type == type) { - // found - *pitem = pNode->item; - if(pPre == NULL) { - queue->head = pNode->next; - } else { - pPre->next = pNode->next; - } - if (queue->head == NULL) - queue->tail = NULL; - // reduce number - queue->numOfItems--; - if (queue->qset) { - atomic_sub_fetch_32(&queue->qset->numOfItems, 1); - } - code = 1; - uDebug("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, type, queue->numOfItems); - break; - } - // move next - pPre = pNode; - pNode = pNode->next; - } - - pthread_mutex_unlock(&queue->mutex); - - return code; -} - - void *taosAllocateQall() { void *p = calloc(sizeof(STaosQall), 1); return p; diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 95ef26576e5ae23139ccca7e64fce2dbabfb229e..a0807d19986aecc43ead9e8886e9c0942f9aa105 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -76,6 +76,9 @@ typedef struct { char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; pthread_mutex_t statusMutex; void * tqueue; // async threads queue + // thread for wait deal result to response client + SList * waitThreads; + tsem_t semWait; } SVnodeObj; #ifdef __cplusplus diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index a7f5919449736e12940b577ad151e45ca3d544db..7b99aebbf6e3386d321a90cbc49e13ddbd7f25d3 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -274,6 +274,9 @@ int32_t vnodeOpen(int32_t vgId) { tsem_init(&pVnode->sem, 0, 0); pthread_mutex_init(&pVnode->statusMutex, NULL); vnodeSetInitStatus(pVnode); + // wait thread init + tsem_init(&pVnode->semWait, 0, 1); + pVnode->waitThreads = tdListNew(sizeof(SWaitThread)); tsdbIncCommitRef(pVnode->vgId); @@ -422,14 +425,29 @@ int32_t vnodeOpen(int32_t vgId) { } #define LOOP_CNT 10 -void freeWaitThread(SVnodeObj* pVnode) { - // check wait thread empty - int type = 0; +void vnodeStopWaitingThread(SVnodeObj* pVnode) { + // check wait thread empty SWaitThread* pWaitThread = NULL; - while(taosReadQitem(pVnode->tqueue, &type, (void** )&pWaitThread) > 0) { + vDebug("vgId:%d :SDEL stop waiting thread count=%d", pVnode->vgId, listNEles(pVnode->waitThreads)); + if(listNEles(pVnode->waitThreads) == 0) { + return; + } + vInfo("vgId:%d :SDEL stop waiting thread not zero. count=%d", pVnode->vgId, listNEles(pVnode->waitThreads)); + + // get lock + tsem_wait(&pVnode->semWait); + + // loop stop + while (1) { + SListNode * pNode = tdListPopHead(pVnode->waitThreads); + if(pNode == NULL) + break; + // thread is running + pWaitThread = (SWaitThread *)pNode->data; int32_t loop = LOOP_CNT; while (taosThreadRunning(pWaitThread->pthread)) { + vInfo("vgId:%d :SDEL loop=%d thread runing post to quit. pthread=%p", pVnode->vgId, loop, pWaitThread->pthread); // only post once if(loop == LOOP_CNT) tsem_post(pWaitThread->psem); @@ -441,6 +459,7 @@ void freeWaitThread(SVnodeObj* pVnode) { // free all if(loop == 0) { + vInfo("vgId:%d :SDEL force kill thread to quit. pthread=%p pWrite=%p", pVnode->vgId, pWaitThread->pthread, pWaitThread->param); // thread not stop , so need kill taosDestoryThread(pWaitThread->pthread); // write msg need remove from queue @@ -448,13 +467,17 @@ void freeWaitThread(SVnodeObj* pVnode) { if (pWrite) vnodeFreeFromWQueue(pWrite->pVnode, pWrite); } else { + vInfo("vgId:%d :SDEL quit thread ok. pthread=%p pWrite=%p", pVnode->vgId, pWaitThread->pthread, pWaitThread->param); free(pWaitThread->pthread); } tsem_destroy(pWaitThread->psem); - taosFreeQitem(pWaitThread); + + // free node + free(pNode); } - taosCloseQueue(pVnode->tqueue); + // unlock + tsem_post(&pVnode->semWait); } int32_t vnodeClose(int32_t vgId) { @@ -481,7 +504,10 @@ int32_t vnodeClose(int32_t vgId) { void vnodeDestroy(SVnodeObj *pVnode) { int32_t code = 0; int32_t vgId = pVnode->vgId; - + + // stop wait thread if have + vnodeStopWaitingThread(pVnode); + if (pVnode->qMgmt) { qCleanupQueryMgmt(pVnode->qMgmt); pVnode->qMgmt = NULL; @@ -547,6 +573,8 @@ void vnodeDestroy(SVnodeObj *pVnode) { dnodeSendStatusMsgToMnode(); } + pVnode->waitThreads = tdListFree(pVnode->waitThreads); + tsem_destroy(pVnode->semWait); tsem_destroy(&pVnode->sem); pthread_mutex_destroy(&pVnode->statusMutex); free(pVnode); @@ -613,34 +641,59 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { // wait thread void vnodeAddWait(void* vparam, pthread_t* pthread, tsem_t* psem, void* param) { - SVnodeObj* pVnode = (SVnodeObj* )vparam; - if(pVnode->tqueue == NULL) { - pVnode->tqueue = taosOpenQueue(); - } + SVnodeObj* pVnode = (SVnodeObj* )vparam; + SWaitThread waitThread = {0}; - SWaitThread* pWaitThread = (SWaitThread* )taosAllocateQitem(sizeof(SWaitThread)); - pWaitThread->pthread = pthread; - pWaitThread->startTime = taosGetTimestampSec(); - pWaitThread->psem = psem; - pWaitThread->param = param; + waitThread.pthread = pthread; + waitThread.startTime = taosGetTimestampSec(); + waitThread.psem = psem; + waitThread.param = param; - int32_t crc = crc32c_sf(0, (crc_stream)param, sizeof(void* )); - taosWriteQitem(pVnode->tqueue, crc, pWaitThread); + // append + tdListAppend(pVnode->waitThreads, &waitThread); + vDebug("vgId:%d :SDEL add wait thread %p wait list count=%d ", pVnode->vgId, param, listNEles(pVnode->waitThreads)); } // called in wait thread void vnodeRemoveWait(void* vparam, void* param) { SVnodeObj* pVnode = (SVnodeObj* )vparam; - int32_t crc = crc32c_sf(0, (crc_stream)param, sizeof(void* )); + SListIter iter = {0}; + + tsem_wait(&pVnode->semWait); + tdListInitIter(pVnode->waitThreads, &iter, TD_LIST_FORWARD); - SWaitThread* pWaitThread = NULL; - taosSearchQitem(pVnode->tqueue, crc, (void** )&pWaitThread); - if (pWaitThread == NULL) { - // not found - return ; + while (1) { + SListNode* pNode = tdListNext(&iter); + if (pNode == NULL) + break; + + SWaitThread * pWaitThread = (SWaitThread *)pNode->data; + if (pWaitThread->param == param) { + // found , free SWaitThread memeber + free(pWaitThread->pthread); + tdListPopNode(pVnode->waitThreads, pNode); + vDebug("vgId:%d :SDEL removed wait thread %p wait list count=%d ", pVnode->vgId, param, listNEles(pVnode->waitThreads)); + // free pListNode self + free(pNode); + break; + } } + tsem_post(&pVnode->semWait); +} - // free thread - free(pWaitThread->pthread); - taosFreeQitem(pWaitThread); +// get wait thread count +bool vnodeWaitTooMany(void* vparam) { + SVnodeObj* pVnode = (SVnodeObj* )vparam; + int32_t count = listNEles(pVnode->waitThreads); + if( count > 32 ) { + vError("vgId:%d :SDEL wait threads too many. count=%d", pVnode->vgId, count); + return true; + } + + return false; +} + +tsem_t* vnodeSemWait(void* vparam) { + SVnodeObj* pVnode = (SVnodeObj* )vparam; + return &pVnode->semWait; } \ No newline at end of file diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 1c3a7ab9fc6a3c0c94384648e76b388bdb066dc1..e3371c79db76f4a3875b505aa7f56c67969c49ce 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -63,6 +63,10 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara SRspRet *pRspRet = NULL; if (pWrite != NULL) pRspRet = &pWrite->rspRet; + // if wal and forward write , no need response + if( qtype == TAOS_QTYPE_WAL || qtype == TAOS_QTYPE_FWD) { + pRspRet = NULL; + } if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { vError("vgId:%d, msg:%s not processed since no handle, qtype:%s hver:%" PRIu64, pVnode->vgId, @@ -107,7 +111,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara code = walWrite(pVnode->wal, pHead); } if (code < 0) { - if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1); + if (syncCode > 0 && pWrite) atomic_sub_fetch_32(&pWrite->processedCount, 1); vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code); pHead->version = 0; return code; @@ -118,7 +122,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara // write data locally code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet); if (code < 0) { - if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1); + if (syncCode > 0 && pWrite) atomic_sub_fetch_32(&pWrite->processedCount, 1); return code; }