diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d9087f59c6f8062e0cfb5f832eda48c825cc3960..9104e8a4234a48692f73d9d4c3225e9cd0668ee0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1352,6 +1352,9 @@ typedef struct { typedef struct { int32_t code; + char tbFName[TSDB_TABLE_FNAME_LEN]; + int32_t sversion; + int32_t tversion; } SResReadyRsp; typedef struct { diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index f4ccc05208d57179228cc8a2f828b4e3afcac756..389fe25780f5bbdde734ba1248dd5e04c4a14d3f 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -57,6 +57,12 @@ typedef struct SIndexMeta { } SIndexMeta; +typedef struct STbVerInfo { + char tbFName[TSDB_TABLE_FNAME_LEN]; + int32_t sversion; + int32_t tversion; +} STbVerInfo; + /* * ASSERT(sizeof(SCTableMeta) == 24) * ASSERT(tableType == TSDB_CHILD_TABLE) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index fc39e80c1e783914518d0b931f20310aad73df18..13ac43bc392d14407bfc04b494f4e5d49a3f1fa3 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -45,7 +45,7 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC catalogUpdateUserAuthInfo(pCatalog, rsp); } - tFreeSUserAuthBatchRsp(&batchRsp); + taosArrayDestroy(batchRsp.pArray); return TSDB_CODE_SUCCESS; } @@ -285,6 +285,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { int64_t *rid = pIter; SRequestObj *pRequest = acquireRequest(*rid); if (NULL == pRequest) { + pIter = taosHashIterate(pObj->pRequests, pIter); continue; } @@ -544,7 +545,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { } taosArrayPush(pBatchReq->reqs, pOneReq); - hbClearClientHbReq(pOneReq); + //hbClearClientHbReq(pOneReq); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } @@ -565,6 +566,11 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { tFreeReqKvHash(pOneReq->info); taosHashClear(pOneReq->info); + if (pOneReq->query) { + taosArrayDestroy(pOneReq->query->queryDesc); + taosMemoryFreeClear(pOneReq->query); + } + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } } @@ -649,6 +655,9 @@ static int32_t hbCreateThread() { } static void hbStopThread() { + if (0 == atomic_load_8(&clientHbMgr.inited)) { + return; + } if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) { tscDebug("hb thread already stopped"); return; @@ -745,13 +754,13 @@ int hbMgrInit() { hbMgrInitHandle(); // init backgroud thread - /*hbCreateThread();*/ + hbCreateThread(); return 0; } void hbMgrCleanUp() { - // hbStopThread(); + hbStopThread(); // destroy all appHbMgr int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f493f02cd65a6b2c8c51d88327a9dc62be63a3de..386283b5b58d04b68a00f89100d64b1312e82b46 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -300,6 +300,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList schedulerFreeJob(pRequest->body.queryJob); } + *pRes = res.res; + pRequest->code = code; terrno = code; return pRequest->code; @@ -347,6 +349,23 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) { taosArrayPush(pArray, &tbSver); } } else if (TDMT_VND_QUERY == pRequest->type) { + SArray* pTbArray = (SArray*)res; + int32_t tbNum = taosArrayGetSize(pTbArray); + if (tbNum <= 0) { + return TSDB_CODE_SUCCESS; + } + + pArray = taosArrayInit(tbNum, sizeof(STbSVersion)); + if (NULL == pArray) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < tbNum; ++i) { + STbVerInfo* tbInfo = taosArrayGet(pTbArray, i); + STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion}; + taosArrayPush(pArray, &tbSver); + } } SCatalog* pCatalog = NULL; @@ -371,6 +390,7 @@ void freeRequestRes(SRequestObj* pRequest, void* res) { if (TDMT_VND_SUBMIT == pRequest->type) { tFreeSSubmitRsp((SSubmitRsp*)res); } else if (TDMT_VND_QUERY == pRequest->type) { + taosArrayDestroy((SArray *)res); } } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 7e99c5583d69fcf708d39206aab3ccca448ddeb8..cd57e76b2cc6eacee39be027563d52d04a1ea038 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -340,8 +340,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb if (pHbReq->query) { SQueryHbReqBasic *pBasic = pHbReq->query; - SRpcConnInfo connInfo = {0}; - rpcGetConnInfo(pMsg->info.handle, &connInfo); + SRpcConnInfo connInfo = pMsg->conn; SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId); if (pConn == NULL) { diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index b59175d86cabd85f28334b95e42afd4b84ab1938..f2a42e3083d0777eecb3ffd9db2a0359c7939ff6 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -233,6 +233,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { mTrace("user:%s, perform update action, old row:%p new row:%p", pOld->user, pOld, pNew); taosWLockLatch(&pOld->lock); pOld->updateTime = pNew->updateTime; + pOld->authVersion = pNew->authVersion; memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); TSWAP(pOld->readDbs, pNew->readDbs); TSWAP(pOld->writeDbs, pNew->writeDbs); @@ -765,6 +766,7 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_ continue; } + pUsers[i].version = ntohl(pUsers[i].version); if (pUser->authVersion <= pUsers[i].version) { mndReleaseUser(pMnode, pUser); continue; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 23957d1a6bfdc0cd87dd6f94727d0e1dc76adb57..d5301e3624fd632d31e48e6eae0944d5ceebd8be 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -2885,7 +2885,7 @@ _return: -int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver) { +int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver, int32_t *tbType, uint64_t *suid, char* stbName) { *sver = -1; if (NULL == pCtg->dbCache) { @@ -2903,14 +2903,12 @@ int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* return TSDB_CODE_SUCCESS; } - int32_t tbType = 0; - uint64_t suid = 0; CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); STableMeta* tbMeta = taosHashGet(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname)); if (tbMeta) { - tbType = tbMeta->tableType; - suid = tbMeta->suid; - if (tbType != TSDB_CHILD_TABLE) { + *tbType = tbMeta->tableType; + *suid = tbMeta->suid; + if (*tbType != TSDB_CHILD_TABLE) { *sver = tbMeta->sversion; } } @@ -2921,44 +2919,49 @@ int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* return TSDB_CODE_SUCCESS; } - if (tbType != TSDB_CHILD_TABLE) { + if (*tbType != TSDB_CHILD_TABLE) { ctgReleaseDBCache(pCtg, dbCache); - ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname); + ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname); return TSDB_CODE_SUCCESS; } - ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, suid); + ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, *suid); CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock); - STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &suid, sizeof(suid)); + STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, suid, sizeof(*suid)); if (NULL == stbMeta || NULL == *stbMeta) { CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); ctgReleaseDBCache(pCtg, dbCache); - ctgDebug("stb not in stbCache, suid:%"PRIx64, suid); + ctgDebug("stb not in stbCache, suid:%"PRIx64, *suid); return TSDB_CODE_SUCCESS; } - if ((*stbMeta)->suid != suid) { + if ((*stbMeta)->suid != *suid) { CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); ctgReleaseDBCache(pCtg, dbCache); - ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, suid, (*stbMeta)->suid); + ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, *suid, (*stbMeta)->suid); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } + size_t nameLen = 0; + char* name = taosHashGetKey(*stbMeta, &nameLen); + + strncpy(stbName, name, nameLen); + stbName[nameLen] = 0; + *sver = (*stbMeta)->sversion; CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); ctgReleaseDBCache(pCtg, dbCache); - ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname); + ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname); return TSDB_CODE_SUCCESS; } - int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) { CTG_API_ENTER(); @@ -2977,9 +2980,26 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm continue; } - ctgGetTbSverFromCache(pCtg, &name, &sver); + int32_t tbType = 0; + uint64_t suid = 0; + char stbName[TSDB_TABLE_FNAME_LEN]; + ctgGetTbSverFromCache(pCtg, &name, &sver, &tbType, &suid, stbName); if (sver >= 0 && sver < pTb->sver) { - catalogRemoveTableMeta(pCtg, &name); //TODO REMOVE STB FROM CACHE + switch (tbType) { + case TSDB_CHILD_TABLE: { + SName stb = name; + strcpy(stb.tname, stbName); + catalogRemoveTableMeta(pCtg, &stb); + break; + } + case TSDB_SUPER_TABLE: + case TSDB_NORMAL_TABLE: + catalogRemoveTableMeta(pCtg, &name); + break; + default: + ctgError("ignore table type %d", tbType); + break; + } } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6d308d7221983ab677859b037725b3161f4e51c5..125af9749a8b29433405507209c531aff1a0906b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -187,8 +187,16 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab *sversion = pTaskInfo->schemaVer.sversion; *tversion = pTaskInfo->schemaVer.tversion; - strcpy(dbName, pTaskInfo->schemaVer.dbname); - strcpy(tableName, pTaskInfo->schemaVer.tablename); + if (pTaskInfo->schemaVer.dbname) { + strcpy(dbName, pTaskInfo->schemaVer.dbname); + } else { + dbName[0] = 0; + } + if (pTaskInfo->schemaVer.tablename) { + strcpy(tableName, pTaskInfo->schemaVer.tablename); + } else { + tableName[0] = 0; + } return 0; } \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 750554e8288fb8dca1d875ca0841794ff46252c0..ccffe775141fe57323707de44c73057edd0e90e5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4574,10 +4574,11 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* return pExprs; } -static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model) { +static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) { SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); + pTaskInfo->schemaVer.dbname = strdup(dbFName); pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->id.queryId = queryId; pTaskInfo->execModel = model; @@ -4992,16 +4993,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod return NULL; } - const char* tname = pTaskInfo->schemaVer.tablename; for (int32_t i = 0; i < numOfCols; ++i) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; - if (tname != NULL && (pTaskInfo->schemaVer.dbname == NULL) && - strncmp(pColNode->tableName, tname, tListLen(pColNode->tableName)) == 0) { - pTaskInfo->schemaVer.dbname = strdup(pColNode->dbName); - } - SColMatchInfo c = {0}; c.output = true; c.colId = pColNode->colId; @@ -5097,7 +5092,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead uint64_t queryId = pPlan->id.queryId; int32_t code = TSDB_CODE_SUCCESS; - *pTaskInfo = createExecTaskInfo(queryId, taskId, model); + *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName); if (*pTaskInfo == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _complete; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 58886d706af0e1091ddb9ac805deb50510d85a37..f4ba2fca8146b37a3e25bc7173488dd85c8e48d0 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -58,7 +58,7 @@ static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen // 3. valid column names for (int32_t j = i + 1; j < numOfCols; ++j) { - if (strncasecmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) { + if (strncmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) { return false; } } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index bcc17a9ae9c0bffe95e1df75746c36021f2d9a6e..511327658f14a58e25460f979a4ebb197c8d4b8c 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -131,6 +131,7 @@ typedef struct SQWTaskCtx { void *taskHandle; void *sinkHandle; SSubplan *plan; + STbVerInfo tbInfo; } SQWTaskCtx; typedef struct SQWSchStatus { diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index 93994c8287c9a5866a6dbc0969977e91f8997f15..6453cff70095b246f0ede7034da07536b1075f2f 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -36,7 +36,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i int32_t code); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); -int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code); +int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo); int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); void qwFreeFetchRsp(void *msg); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index db63c71d1123cec32e810a5583deb0a936688070..cb7ff08fa3b8c869a101ad202c274490a49091bd 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -718,6 +718,16 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void return TSDB_CODE_SUCCESS; } + +void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { + char dbFName[TSDB_DB_FNAME_LEN]; + char tbName[TSDB_TABLE_NAME_LEN]; + + qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion); + + sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName); +} + int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; SQWTaskCtx *ctx = NULL; @@ -899,6 +909,11 @@ _return: qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED); } + if (readyConnection) { + qwBuildAndSendReadyRsp(readyConnection, code, ctx ? &ctx->tbInfo : NULL); + QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code)); + } + if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); @@ -910,11 +925,6 @@ _return: qwReleaseTaskCtx(mgmt, ctx); } - if (readyConnection) { - qwBuildAndSendReadyRsp(readyConnection, code); - QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code)); - } - if (code) { qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); } @@ -975,6 +985,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex atomic_store_ptr(&ctx->sinkHandle, sinkHandle); if (pTaskInfo && sinkHandle) { + qwSaveTbVersionInfo(pTaskInfo, ctx); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); } @@ -1047,7 +1058,7 @@ _return: } if (needRsp) { - qwBuildAndSendReadyRsp(&qwMsg->connInfo, code); + qwBuildAndSendReadyRsp(&qwMsg->connInfo, code, NULL); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } @@ -1150,8 +1161,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if (NULL == rsp) { - atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle); - atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle); + ctx->dataConnInfo = qwMsg->connInfo; QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); } else { diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 562e550bdcef2f2119e4c90741cf781f19ac1102..0a192eb795b689285831f366aff30af4a3743b27 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -63,9 +63,14 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code) { +int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) { SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); pRsp->code = code; + if (tbInfo) { + strcpy(pRsp->tbFName, tbInfo->tbFName); + pRsp->sversion = tbInfo->sversion; + pRsp->tversion = tbInfo->tversion; + } SRpcMsg rpcRsp = { .msgType = TDMT_VND_RES_READY_RSP, diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 5a6fcee759d86ce85c9d67eb28cb5950401b9857..be92de774b8964c975360467600f570c2540c907 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -39,12 +39,6 @@ enum { SCH_WRITE, }; -typedef enum { - SCH_RES_TYPE_QUERY, - SCH_RES_TYPE_FETCH, -} SCH_RES_TYPE; - - typedef struct SSchTrans { void *transInst; void *transHandle; @@ -197,7 +191,7 @@ typedef struct SSchJob { int32_t errCode; SArray *errList; // SArray SRWLatch resLock; - SCH_RES_TYPE resType; + void *queryRes; void *resData; //TODO free it or not int32_t resNumOfRows; const char *sql; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 9354c1a875d030c1839484db967ad091d7fdf8a9..dcd87557aa62101a8cceae9f0f191fe2cae53e3b 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1058,8 +1058,6 @@ _return: int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); - pJob->resType = SCH_RES_TYPE_FETCH; - atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); atomic_store_ptr(&pJob->resData, pRsp); @@ -1070,6 +1068,27 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs return TSDB_CODE_SUCCESS; } +int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) { + if (rsp->tbFName[0]) { + if (NULL == pJob->queryRes) { + pJob->queryRes = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo)); + if (NULL == pJob->queryRes) { + SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + } + + STbVerInfo tbInfo; + strcpy(tbInfo.tbFName, rsp->tbFName); + tbInfo.sversion = rsp->sversion; + tbInfo.tversion = rsp->tversion; + + taosArrayPush((SArray *)pJob->queryRes, &tbInfo); + } + + return TSDB_CODE_SUCCESS; +} + + // Note: no more task error processing, handled in function internal int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { @@ -1180,10 +1199,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows); - pJob->resType = SCH_RES_TYPE_QUERY; SCH_LOCK(SCH_WRITE, &pJob->resLock); - if (pJob->resData) { - SSubmitRsp *sum = pJob->resData; + if (pJob->queryRes) { + SSubmitRsp *sum = pJob->queryRes; sum->affectedRows += rsp->affectedRows; sum->nBlocks += rsp->nBlocks; sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); @@ -1191,7 +1209,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch taosMemoryFree(rsp->pBlocks); taosMemoryFree(rsp); } else { - pJob->resData = rsp; + pJob->queryRes = rsp; } SCH_UNLOCK(SCH_WRITE, &pJob->resLock); } @@ -1225,6 +1243,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } SCH_ERR_JRET(rsp->code); + + SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; @@ -2399,6 +2420,12 @@ void schFreeJobImpl(void *job) { qExplainFreeCtx(pJob->explainCtx); + if (SCH_IS_QUERY_JOB(pJob)) { + taosArrayDestroy((SArray *)pJob->queryRes); + } else { + tFreeSSubmitRsp((SSubmitRsp*)pJob->queryRes); + } + taosMemoryFreeClear(pJob->resData); taosMemoryFreeClear(pJob); @@ -2461,8 +2488,6 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); - pJob->resType = SCH_RES_TYPE_FETCH; - int64_t refId = taosAddRef(schMgmt.jobRef, pJob); if (refId < 0) { SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); @@ -2540,24 +2565,30 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + int32_t code = 0; + + *pJob = 0; + if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); } else { - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); + SCH_ERR_JRET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); } - SSchJob *job = schAcquireJob(*pJob); +_return: - pRes->code = atomic_load_32(&job->errCode); - pRes->numOfRows = job->resNumOfRows; - if (SCH_RES_TYPE_QUERY == job->resType) { - pRes->res = job->resData; - job->resData = NULL; - } + if (*pJob) { + SSchJob *job = schAcquireJob(*pJob); - schReleaseJob(*pJob); + pRes->code = atomic_load_32(&job->errCode); + pRes->numOfRows = job->resNumOfRows; + pRes->res = job->queryRes; + job->queryRes = NULL; - return TSDB_CODE_SUCCESS; + schReleaseJob(*pJob); + } + + return code; } int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {