From d472c28142f2083241aa6586a3c7a551eb1d7db1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 19 May 2022 22:08:06 +0800 Subject: [PATCH] update table meta based on query sversion --- include/common/tmsg.h | 3 +++ include/libs/qcom/query.h | 6 ++++++ source/client/src/clientImpl.c | 18 +++++++++++++++++- source/libs/executor/src/executor.c | 12 ++++++++++-- source/libs/qworker/inc/qworkerInt.h | 1 + source/libs/qworker/inc/qworkerMsg.h | 2 +- source/libs/qworker/src/qworker.c | 23 +++++++++++++++++------ source/libs/qworker/src/qworkerMsg.c | 7 ++++++- source/libs/scheduler/src/scheduler.c | 25 +++++++++++++++++++++++++ 9 files changed, 86 insertions(+), 11 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index addb84046c..5b1871dec6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1352,6 +1352,9 @@ typedef struct { typedef struct { int32_t code; + char tbFName[TSDB_TABLE_FNAME_LEN]; + int32_t sversion; + int32_t tversion; } SResReadyRsp; typedef struct { diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index daf008108b..521fb78566 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -57,6 +57,12 @@ typedef struct SIndexMeta { } SIndexMeta; +typedef struct STbVerInfo { + char tbFName[TSDB_TABLE_FNAME_LEN]; + int32_t sversion; + int32_t tversion; +} STbVerInfo; + /* * ASSERT(sizeof(SCTableMeta) == 24) * ASSERT(tableType == TSDB_CHILD_TABLE) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 7693d26d3f..2b3bbd21f3 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -344,7 +344,23 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) { taosArrayPush(pArray, &tbSver); } } else if (TDMT_VND_QUERY == pRequest->type) { + SArray* pTbArray = (SArray*)res; + int32_t tbNum = taosArrayGetSize(pTbArray); + if (tbNum <= 0) { + return TSDB_CODE_SUCCESS; + } + + pArray = taosArrayInit(tbNum, sizeof(STbSVersion)); + if (NULL == pArray) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t i = 0; i < tbNum; ++i) { + STbVerInfo* tbInfo = taosArrayGet(pTbArray, i); + STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion}; + taosArrayPush(pArray, &tbSver); + } } SCatalog* pCatalog = NULL; @@ -369,7 +385,7 @@ void freeRequestRes(SRequestObj* pRequest, void* res) { if (TDMT_VND_SUBMIT == pRequest->type) { tFreeSSubmitRsp((SSubmitRsp*)res); } else if (TDMT_VND_QUERY == pRequest->type) { - + taosArrayDestroy((SArray *)res); } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fa840e1cd6..a6251e047a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -177,8 +177,16 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab *sversion = pTaskInfo->schemaVer.sversion; *tversion = pTaskInfo->schemaVer.tversion; - strcpy(dbName, pTaskInfo->schemaVer.dbname); - strcpy(tableName, pTaskInfo->schemaVer.tablename); + if (pTaskInfo->schemaVer.dbname) { + strcpy(dbName, pTaskInfo->schemaVer.dbname); + } else { + dbName[0] = 0; + } + if (pTaskInfo->schemaVer.tablename) { + strcpy(tableName, pTaskInfo->schemaVer.tablename); + } else { + tableName[0] = 0; + } return 0; } \ No newline at end of file diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index bcc17a9ae9..511327658f 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -131,6 +131,7 @@ typedef struct SQWTaskCtx { void *taskHandle; void *sinkHandle; SSubplan *plan; + STbVerInfo tbInfo; } SQWTaskCtx; typedef struct SQWSchStatus { diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index 93994c8287..6453cff700 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -36,7 +36,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i int32_t code); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); -int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code); +int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo); int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); void qwFreeFetchRsp(void *msg); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index db63c71d11..102cf9aa0a 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -718,6 +718,16 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void return TSDB_CODE_SUCCESS; } + +void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { + char dbFName[TSDB_DB_FNAME_LEN]; + char tbName[TSDB_TABLE_NAME_LEN]; + + qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion); + + sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName); +} + int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; SQWTaskCtx *ctx = NULL; @@ -899,6 +909,11 @@ _return: qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED); } + if (readyConnection) { + qwBuildAndSendReadyRsp(readyConnection, code, ctx ? &ctx->tbInfo : NULL); + QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code)); + } + if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); @@ -910,11 +925,6 @@ _return: qwReleaseTaskCtx(mgmt, ctx); } - if (readyConnection) { - qwBuildAndSendReadyRsp(readyConnection, code); - QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code)); - } - if (code) { qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); } @@ -975,6 +985,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex atomic_store_ptr(&ctx->sinkHandle, sinkHandle); if (pTaskInfo && sinkHandle) { + qwSaveTbVersionInfo(pTaskInfo, ctx); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); } @@ -1047,7 +1058,7 @@ _return: } if (needRsp) { - qwBuildAndSendReadyRsp(&qwMsg->connInfo, code); + qwBuildAndSendReadyRsp(&qwMsg->connInfo, code, NULL); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index d502d952f3..d66b8db2bd 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -63,9 +63,14 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code) { +int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) { SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); pRsp->code = code; + if (tbInfo) { + strcpy(pRsp->tbFName, tbInfo->tbFName); + pRsp->sversion = tbInfo->sversion; + pRsp->tversion = tbInfo->tversion; + } SRpcMsg rpcRsp = { .msgType = TDMT_VND_RES_READY_RSP, diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 9354c1a875..7f79122e0e 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1070,6 +1070,27 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs return TSDB_CODE_SUCCESS; } +int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) { + if (rsp->tbFName[0]) { + if (NULL == pJob->resData) { + pJob->resData = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo)); + if (NULL == pJob->resData) { + SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + } + + STbVerInfo tbInfo; + strcpy(tbInfo.tbFName, rsp->tbFName); + tbInfo.sversion = rsp->sversion; + tbInfo.tversion = rsp->tversion; + + taosArrayPush((SArray *)pJob->resData, &tbInfo); + } + + return TSDB_CODE_SUCCESS; +} + + // Note: no more task error processing, handled in function internal int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { @@ -1225,6 +1246,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } SCH_ERR_JRET(rsp->code); + pJob->resType = SCH_RES_TYPE_QUERY; + + SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; -- GitLab