diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a32d4110d9c09e688be2b3e635358c51a41c9680..905472dc712aa3064e4dadd85746f3c294f1c535 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -299,6 +299,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList schedulerFreeJob(pRequest->body.queryJob); } + *pRes = res.res; + pRequest->code = code; terrno = code; return pRequest->code; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 23957d1a6bfdc0cd87dd6f94727d0e1dc76adb57..d5301e3624fd632d31e48e6eae0944d5ceebd8be 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -2885,7 +2885,7 @@ _return: -int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver) { +int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver, int32_t *tbType, uint64_t *suid, char* stbName) { *sver = -1; if (NULL == pCtg->dbCache) { @@ -2903,14 +2903,12 @@ int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* return TSDB_CODE_SUCCESS; } - int32_t tbType = 0; - uint64_t suid = 0; CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); STableMeta* tbMeta = taosHashGet(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname)); if (tbMeta) { - tbType = tbMeta->tableType; - suid = tbMeta->suid; - if (tbType != TSDB_CHILD_TABLE) { + *tbType = tbMeta->tableType; + *suid = tbMeta->suid; + if (*tbType != TSDB_CHILD_TABLE) { *sver = tbMeta->sversion; } } @@ -2921,44 +2919,49 @@ int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* return TSDB_CODE_SUCCESS; } - if (tbType != TSDB_CHILD_TABLE) { + if (*tbType != TSDB_CHILD_TABLE) { ctgReleaseDBCache(pCtg, dbCache); - ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname); + ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname); return TSDB_CODE_SUCCESS; } - ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, suid); + ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, *suid); CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock); - STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &suid, sizeof(suid)); + STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, suid, sizeof(*suid)); if (NULL == stbMeta || NULL == *stbMeta) { CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); ctgReleaseDBCache(pCtg, dbCache); - ctgDebug("stb not in stbCache, suid:%"PRIx64, suid); + ctgDebug("stb not in stbCache, suid:%"PRIx64, *suid); return TSDB_CODE_SUCCESS; } - if ((*stbMeta)->suid != suid) { + if ((*stbMeta)->suid != *suid) { CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); ctgReleaseDBCache(pCtg, dbCache); - ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, suid, (*stbMeta)->suid); + ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, *suid, (*stbMeta)->suid); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } + size_t nameLen = 0; + char* name = taosHashGetKey(*stbMeta, &nameLen); + + strncpy(stbName, name, nameLen); + stbName[nameLen] = 0; + *sver = (*stbMeta)->sversion; CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); ctgReleaseDBCache(pCtg, dbCache); - ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname); + ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname); return TSDB_CODE_SUCCESS; } - int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) { CTG_API_ENTER(); @@ -2977,9 +2980,26 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm continue; } - ctgGetTbSverFromCache(pCtg, &name, &sver); + int32_t tbType = 0; + uint64_t suid = 0; + char stbName[TSDB_TABLE_FNAME_LEN]; + ctgGetTbSverFromCache(pCtg, &name, &sver, &tbType, &suid, stbName); if (sver >= 0 && sver < pTb->sver) { - catalogRemoveTableMeta(pCtg, &name); //TODO REMOVE STB FROM CACHE + switch (tbType) { + case TSDB_CHILD_TABLE: { + SName stb = name; + strcpy(stb.tname, stbName); + catalogRemoveTableMeta(pCtg, &stb); + break; + } + case TSDB_SUPER_TABLE: + case TSDB_NORMAL_TABLE: + catalogRemoveTableMeta(pCtg, &name); + break; + default: + ctgError("ignore table type %d", tbType); + break; + } } } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 37aeb367f521d389b53a52b2e45c0dda6bcbbb13..7c9ea1a5c5a4caed817df5199e5c60e006237588 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4576,10 +4576,11 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* return pExprs; } -static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model) { +static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) { SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); + pTaskInfo->schemaVer.dbname = strdup(dbFName); pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->id.queryId = queryId; pTaskInfo->execModel = model; @@ -4994,16 +4995,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod return NULL; } - const char* tname = pTaskInfo->schemaVer.tablename; for (int32_t i = 0; i < numOfCols; ++i) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; - if (tname != NULL && (pTaskInfo->schemaVer.dbname == NULL) && - strncmp(pColNode->tableName, tname, tListLen(pColNode->tableName)) == 0) { - pTaskInfo->schemaVer.dbname = strdup(pColNode->dbName); - } - SColMatchInfo c = {0}; c.output = true; c.colId = pColNode->colId; @@ -5099,7 +5094,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead uint64_t queryId = pPlan->id.queryId; int32_t code = TSDB_CODE_SUCCESS; - *pTaskInfo = createExecTaskInfo(queryId, taskId, model); + *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName); if (*pTaskInfo == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _complete; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 58886d706af0e1091ddb9ac805deb50510d85a37..f4ba2fca8146b37a3e25bc7173488dd85c8e48d0 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -58,7 +58,7 @@ static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen // 3. valid column names for (int32_t j = i + 1; j < numOfCols; ++j) { - if (strncasecmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) { + if (strncmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) { return false; } } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 102cf9aa0a8f1c2efe18a0df206f642b4666f4be..cb7ff08fa3b8c869a101ad202c274490a49091bd 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1161,8 +1161,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if (NULL == rsp) { - atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle); - atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle); + ctx->dataConnInfo = qwMsg->connInfo; QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); } else { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 7f79122e0ec132284ecd63551c25d47a6b8761af..899748ec45d8c800ef184bc714f1946435db34ce 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -2565,24 +2565,32 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + int32_t code = 0; + + *pJob = 0; + if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); } else { - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); + SCH_ERR_JRET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); } - SSchJob *job = schAcquireJob(*pJob); +_return: - pRes->code = atomic_load_32(&job->errCode); - pRes->numOfRows = job->resNumOfRows; - if (SCH_RES_TYPE_QUERY == job->resType) { - pRes->res = job->resData; - job->resData = NULL; - } + if (*pJob) { + SSchJob *job = schAcquireJob(*pJob); - schReleaseJob(*pJob); + pRes->code = atomic_load_32(&job->errCode); + pRes->numOfRows = job->resNumOfRows; + if (SCH_RES_TYPE_QUERY == job->resType) { + pRes->res = job->resData; + job->resData = NULL; + } - return TSDB_CODE_SUCCESS; + schReleaseJob(*pJob); + } + + return code; } int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {