未验证 提交 b8a3a16a 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #12764 from taosdata/feature/qnode

feat: refresh table meta based on sversion
...@@ -1352,6 +1352,9 @@ typedef struct { ...@@ -1352,6 +1352,9 @@ typedef struct {
typedef struct { typedef struct {
int32_t code; int32_t code;
char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t sversion;
int32_t tversion;
} SResReadyRsp; } SResReadyRsp;
typedef struct { typedef struct {
......
...@@ -57,6 +57,12 @@ typedef struct SIndexMeta { ...@@ -57,6 +57,12 @@ typedef struct SIndexMeta {
} SIndexMeta; } SIndexMeta;
typedef struct STbVerInfo {
char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t sversion;
int32_t tversion;
} STbVerInfo;
/* /*
* ASSERT(sizeof(SCTableMeta) == 24) * ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE) * ASSERT(tableType == TSDB_CHILD_TABLE)
......
...@@ -45,7 +45,7 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC ...@@ -45,7 +45,7 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
catalogUpdateUserAuthInfo(pCatalog, rsp); catalogUpdateUserAuthInfo(pCatalog, rsp);
} }
tFreeSUserAuthBatchRsp(&batchRsp); taosArrayDestroy(batchRsp.pArray);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -285,6 +285,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { ...@@ -285,6 +285,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
int64_t *rid = pIter; int64_t *rid = pIter;
SRequestObj *pRequest = acquireRequest(*rid); SRequestObj *pRequest = acquireRequest(*rid);
if (NULL == pRequest) { if (NULL == pRequest) {
pIter = taosHashIterate(pObj->pRequests, pIter);
continue; continue;
} }
...@@ -544,7 +545,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -544,7 +545,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
} }
taosArrayPush(pBatchReq->reqs, pOneReq); taosArrayPush(pBatchReq->reqs, pOneReq);
hbClearClientHbReq(pOneReq); //hbClearClientHbReq(pOneReq);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
} }
...@@ -565,6 +566,11 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { ...@@ -565,6 +566,11 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
tFreeReqKvHash(pOneReq->info); tFreeReqKvHash(pOneReq->info);
taosHashClear(pOneReq->info); taosHashClear(pOneReq->info);
if (pOneReq->query) {
taosArrayDestroy(pOneReq->query->queryDesc);
taosMemoryFreeClear(pOneReq->query);
}
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
} }
} }
...@@ -649,6 +655,9 @@ static int32_t hbCreateThread() { ...@@ -649,6 +655,9 @@ static int32_t hbCreateThread() {
} }
static void hbStopThread() { static void hbStopThread() {
if (0 == atomic_load_8(&clientHbMgr.inited)) {
return;
}
if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) { if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
tscDebug("hb thread already stopped"); tscDebug("hb thread already stopped");
return; return;
...@@ -745,13 +754,13 @@ int hbMgrInit() { ...@@ -745,13 +754,13 @@ int hbMgrInit() {
hbMgrInitHandle(); hbMgrInitHandle();
// init backgroud thread // init backgroud thread
/*hbCreateThread();*/ hbCreateThread();
return 0; return 0;
} }
void hbMgrCleanUp() { void hbMgrCleanUp() {
// hbStopThread(); hbStopThread();
// destroy all appHbMgr // destroy all appHbMgr
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
......
...@@ -300,6 +300,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -300,6 +300,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
schedulerFreeJob(pRequest->body.queryJob); schedulerFreeJob(pRequest->body.queryJob);
} }
*pRes = res.res;
pRequest->code = code; pRequest->code = code;
terrno = code; terrno = code;
return pRequest->code; return pRequest->code;
...@@ -347,6 +349,23 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) { ...@@ -347,6 +349,23 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
taosArrayPush(pArray, &tbSver); taosArrayPush(pArray, &tbSver);
} }
} else if (TDMT_VND_QUERY == pRequest->type) { } else if (TDMT_VND_QUERY == pRequest->type) {
SArray* pTbArray = (SArray*)res;
int32_t tbNum = taosArrayGetSize(pTbArray);
if (tbNum <= 0) {
return TSDB_CODE_SUCCESS;
}
pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
if (NULL == pArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < tbNum; ++i) {
STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion};
taosArrayPush(pArray, &tbSver);
}
} }
SCatalog* pCatalog = NULL; SCatalog* pCatalog = NULL;
...@@ -371,6 +390,7 @@ void freeRequestRes(SRequestObj* pRequest, void* res) { ...@@ -371,6 +390,7 @@ void freeRequestRes(SRequestObj* pRequest, void* res) {
if (TDMT_VND_SUBMIT == pRequest->type) { if (TDMT_VND_SUBMIT == pRequest->type) {
tFreeSSubmitRsp((SSubmitRsp*)res); tFreeSSubmitRsp((SSubmitRsp*)res);
} else if (TDMT_VND_QUERY == pRequest->type) { } else if (TDMT_VND_QUERY == pRequest->type) {
taosArrayDestroy((SArray *)res);
} }
} }
......
...@@ -340,8 +340,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -340,8 +340,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
if (pHbReq->query) { if (pHbReq->query) {
SQueryHbReqBasic *pBasic = pHbReq->query; SQueryHbReqBasic *pBasic = pHbReq->query;
SRpcConnInfo connInfo = {0}; SRpcConnInfo connInfo = pMsg->conn;
rpcGetConnInfo(pMsg->info.handle, &connInfo);
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId); SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
if (pConn == NULL) { if (pConn == NULL) {
......
...@@ -233,6 +233,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { ...@@ -233,6 +233,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
mTrace("user:%s, perform update action, old row:%p new row:%p", pOld->user, pOld, pNew); mTrace("user:%s, perform update action, old row:%p new row:%p", pOld->user, pOld, pNew);
taosWLockLatch(&pOld->lock); taosWLockLatch(&pOld->lock);
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
pOld->authVersion = pNew->authVersion;
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN);
TSWAP(pOld->readDbs, pNew->readDbs); TSWAP(pOld->readDbs, pNew->readDbs);
TSWAP(pOld->writeDbs, pNew->writeDbs); TSWAP(pOld->writeDbs, pNew->writeDbs);
...@@ -765,6 +766,7 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_ ...@@ -765,6 +766,7 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_
continue; continue;
} }
pUsers[i].version = ntohl(pUsers[i].version);
if (pUser->authVersion <= pUsers[i].version) { if (pUser->authVersion <= pUsers[i].version) {
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
continue; continue;
......
...@@ -2885,7 +2885,7 @@ _return: ...@@ -2885,7 +2885,7 @@ _return:
int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver) { int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver, int32_t *tbType, uint64_t *suid, char* stbName) {
*sver = -1; *sver = -1;
if (NULL == pCtg->dbCache) { if (NULL == pCtg->dbCache) {
...@@ -2903,14 +2903,12 @@ int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* ...@@ -2903,14 +2903,12 @@ int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tbType = 0;
uint64_t suid = 0;
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
STableMeta* tbMeta = taosHashGet(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname)); STableMeta* tbMeta = taosHashGet(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname));
if (tbMeta) { if (tbMeta) {
tbType = tbMeta->tableType; *tbType = tbMeta->tableType;
suid = tbMeta->suid; *suid = tbMeta->suid;
if (tbType != TSDB_CHILD_TABLE) { if (*tbType != TSDB_CHILD_TABLE) {
*sver = tbMeta->sversion; *sver = tbMeta->sversion;
} }
} }
...@@ -2921,44 +2919,49 @@ int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* ...@@ -2921,44 +2919,49 @@ int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (tbType != TSDB_CHILD_TABLE) { if (*tbType != TSDB_CHILD_TABLE) {
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname); ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, suid); ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, *suid);
CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock); CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &suid, sizeof(suid)); STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, suid, sizeof(*suid));
if (NULL == stbMeta || NULL == *stbMeta) { if (NULL == stbMeta || NULL == *stbMeta) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("stb not in stbCache, suid:%"PRIx64, suid); ctgDebug("stb not in stbCache, suid:%"PRIx64, *suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if ((*stbMeta)->suid != suid) { if ((*stbMeta)->suid != *suid) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, suid, (*stbMeta)->suid); ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, *suid, (*stbMeta)->suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
size_t nameLen = 0;
char* name = taosHashGetKey(*stbMeta, &nameLen);
strncpy(stbName, name, nameLen);
stbName[nameLen] = 0;
*sver = (*stbMeta)->sversion; *sver = (*stbMeta)->sversion;
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname); ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) { int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) {
CTG_API_ENTER(); CTG_API_ENTER();
...@@ -2977,9 +2980,26 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm ...@@ -2977,9 +2980,26 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
continue; continue;
} }
ctgGetTbSverFromCache(pCtg, &name, &sver); int32_t tbType = 0;
uint64_t suid = 0;
char stbName[TSDB_TABLE_FNAME_LEN];
ctgGetTbSverFromCache(pCtg, &name, &sver, &tbType, &suid, stbName);
if (sver >= 0 && sver < pTb->sver) { if (sver >= 0 && sver < pTb->sver) {
catalogRemoveTableMeta(pCtg, &name); //TODO REMOVE STB FROM CACHE switch (tbType) {
case TSDB_CHILD_TABLE: {
SName stb = name;
strcpy(stb.tname, stbName);
catalogRemoveTableMeta(pCtg, &stb);
break;
}
case TSDB_SUPER_TABLE:
case TSDB_NORMAL_TABLE:
catalogRemoveTableMeta(pCtg, &name);
break;
default:
ctgError("ignore table type %d", tbType);
break;
}
} }
} }
......
...@@ -187,8 +187,16 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab ...@@ -187,8 +187,16 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab
*sversion = pTaskInfo->schemaVer.sversion; *sversion = pTaskInfo->schemaVer.sversion;
*tversion = pTaskInfo->schemaVer.tversion; *tversion = pTaskInfo->schemaVer.tversion;
if (pTaskInfo->schemaVer.dbname) {
strcpy(dbName, pTaskInfo->schemaVer.dbname); strcpy(dbName, pTaskInfo->schemaVer.dbname);
} else {
dbName[0] = 0;
}
if (pTaskInfo->schemaVer.tablename) {
strcpy(tableName, pTaskInfo->schemaVer.tablename); strcpy(tableName, pTaskInfo->schemaVer.tablename);
} else {
tableName[0] = 0;
}
return 0; return 0;
} }
\ No newline at end of file
...@@ -4574,10 +4574,11 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* ...@@ -4574,10 +4574,11 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
return pExprs; return pExprs;
} }
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model) { static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->schemaVer.dbname = strdup(dbFName);
pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->id.queryId = queryId; pTaskInfo->id.queryId = queryId;
pTaskInfo->execModel = model; pTaskInfo->execModel = model;
...@@ -4992,16 +4993,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod ...@@ -4992,16 +4993,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
return NULL; return NULL;
} }
const char* tname = pTaskInfo->schemaVer.tablename;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
if (tname != NULL && (pTaskInfo->schemaVer.dbname == NULL) &&
strncmp(pColNode->tableName, tname, tListLen(pColNode->tableName)) == 0) {
pTaskInfo->schemaVer.dbname = strdup(pColNode->dbName);
}
SColMatchInfo c = {0}; SColMatchInfo c = {0};
c.output = true; c.output = true;
c.colId = pColNode->colId; c.colId = pColNode->colId;
...@@ -5097,7 +5092,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -5097,7 +5092,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
uint64_t queryId = pPlan->id.queryId; uint64_t queryId = pPlan->id.queryId;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
*pTaskInfo = createExecTaskInfo(queryId, taskId, model); *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
if (*pTaskInfo == NULL) { if (*pTaskInfo == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY; code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete; goto _complete;
......
...@@ -58,7 +58,7 @@ static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen ...@@ -58,7 +58,7 @@ static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen
// 3. valid column names // 3. valid column names
for (int32_t j = i + 1; j < numOfCols; ++j) { for (int32_t j = i + 1; j < numOfCols; ++j) {
if (strncasecmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) { if (strncmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) {
return false; return false;
} }
} }
......
...@@ -131,6 +131,7 @@ typedef struct SQWTaskCtx { ...@@ -131,6 +131,7 @@ typedef struct SQWTaskCtx {
void *taskHandle; void *taskHandle;
void *sinkHandle; void *sinkHandle;
SSubplan *plan; SSubplan *plan;
STbVerInfo tbInfo;
} SQWTaskCtx; } SQWTaskCtx;
typedef struct SQWSchStatus { typedef struct SQWSchStatus {
......
...@@ -36,7 +36,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i ...@@ -36,7 +36,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
int32_t code); int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
void qwFreeFetchRsp(void *msg); void qwFreeFetchRsp(void *msg);
......
...@@ -718,6 +718,16 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void ...@@ -718,6 +718,16 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);
}
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0; int32_t code = 0;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
...@@ -899,6 +909,11 @@ _return: ...@@ -899,6 +909,11 @@ _return:
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
} }
if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, code, ctx ? &ctx->tbInfo : NULL);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
}
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
...@@ -910,11 +925,6 @@ _return: ...@@ -910,11 +925,6 @@ _return:
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, code);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
}
if (code) { if (code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
} }
...@@ -975,6 +985,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex ...@@ -975,6 +985,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
atomic_store_ptr(&ctx->sinkHandle, sinkHandle); atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
if (pTaskInfo && sinkHandle) { if (pTaskInfo && sinkHandle) {
qwSaveTbVersionInfo(pTaskInfo, ctx);
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
} }
...@@ -1047,7 +1058,7 @@ _return: ...@@ -1047,7 +1058,7 @@ _return:
} }
if (needRsp) { if (needRsp) {
qwBuildAndSendReadyRsp(&qwMsg->connInfo, code); qwBuildAndSendReadyRsp(&qwMsg->connInfo, code, NULL);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
} }
...@@ -1150,8 +1161,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1150,8 +1161,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (NULL == rsp) { if (NULL == rsp) {
atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle); ctx->dataConnInfo = qwMsg->connInfo;
atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
} else { } else {
......
...@@ -63,9 +63,14 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) { ...@@ -63,9 +63,14 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code) { int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) {
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
pRsp->code = code; pRsp->code = code;
if (tbInfo) {
strcpy(pRsp->tbFName, tbInfo->tbFName);
pRsp->sversion = tbInfo->sversion;
pRsp->tversion = tbInfo->tversion;
}
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP, .msgType = TDMT_VND_RES_READY_RSP,
......
...@@ -39,12 +39,6 @@ enum { ...@@ -39,12 +39,6 @@ enum {
SCH_WRITE, SCH_WRITE,
}; };
typedef enum {
SCH_RES_TYPE_QUERY,
SCH_RES_TYPE_FETCH,
} SCH_RES_TYPE;
typedef struct SSchTrans { typedef struct SSchTrans {
void *transInst; void *transInst;
void *transHandle; void *transHandle;
...@@ -197,7 +191,7 @@ typedef struct SSchJob { ...@@ -197,7 +191,7 @@ typedef struct SSchJob {
int32_t errCode; int32_t errCode;
SArray *errList; // SArray<SQueryErrorInfo> SArray *errList; // SArray<SQueryErrorInfo>
SRWLatch resLock; SRWLatch resLock;
SCH_RES_TYPE resType; void *queryRes;
void *resData; //TODO free it or not void *resData; //TODO free it or not
int32_t resNumOfRows; int32_t resNumOfRows;
const char *sql; const char *sql;
......
...@@ -1058,8 +1058,6 @@ _return: ...@@ -1058,8 +1058,6 @@ _return:
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
pJob->resType = SCH_RES_TYPE_FETCH;
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
atomic_store_ptr(&pJob->resData, pRsp); atomic_store_ptr(&pJob->resData, pRsp);
...@@ -1070,6 +1068,27 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs ...@@ -1070,6 +1068,27 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) {
if (rsp->tbFName[0]) {
if (NULL == pJob->queryRes) {
pJob->queryRes = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
if (NULL == pJob->queryRes) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
STbVerInfo tbInfo;
strcpy(tbInfo.tbFName, rsp->tbFName);
tbInfo.sversion = rsp->sversion;
tbInfo.tversion = rsp->tversion;
taosArrayPush((SArray *)pJob->queryRes, &tbInfo);
}
return TSDB_CODE_SUCCESS;
}
// Note: no more task error processing, handled in function internal // Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
int32_t rspCode) { int32_t rspCode) {
...@@ -1180,10 +1199,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -1180,10 +1199,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows); SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
pJob->resType = SCH_RES_TYPE_QUERY;
SCH_LOCK(SCH_WRITE, &pJob->resLock); SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (pJob->resData) { if (pJob->queryRes) {
SSubmitRsp *sum = pJob->resData; SSubmitRsp *sum = pJob->queryRes;
sum->affectedRows += rsp->affectedRows; sum->affectedRows += rsp->affectedRows;
sum->nBlocks += rsp->nBlocks; sum->nBlocks += rsp->nBlocks;
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
...@@ -1191,7 +1209,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -1191,7 +1209,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
taosMemoryFree(rsp->pBlocks); taosMemoryFree(rsp->pBlocks);
taosMemoryFree(rsp); taosMemoryFree(rsp);
} else { } else {
pJob->resData = rsp; pJob->queryRes = rsp;
} }
SCH_UNLOCK(SCH_WRITE, &pJob->resLock); SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
} }
...@@ -1225,6 +1243,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -1225,6 +1243,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(rsp->code);
SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;
...@@ -2399,6 +2420,12 @@ void schFreeJobImpl(void *job) { ...@@ -2399,6 +2420,12 @@ void schFreeJobImpl(void *job) {
qExplainFreeCtx(pJob->explainCtx); qExplainFreeCtx(pJob->explainCtx);
if (SCH_IS_QUERY_JOB(pJob)) {
taosArrayDestroy((SArray *)pJob->queryRes);
} else {
tFreeSSubmitRsp((SSubmitRsp*)pJob->queryRes);
}
taosMemoryFreeClear(pJob->resData); taosMemoryFreeClear(pJob->resData);
taosMemoryFreeClear(pJob); taosMemoryFreeClear(pJob);
...@@ -2461,8 +2488,6 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa ...@@ -2461,8 +2488,6 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa
SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
pJob->resType = SCH_RES_TYPE_FETCH;
int64_t refId = taosAddRef(schMgmt.jobRef, pJob); int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) { if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
...@@ -2540,24 +2565,30 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in ...@@ -2540,24 +2565,30 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
int32_t code = 0;
*pJob = 0;
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
} else { } else {
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); SCH_ERR_JRET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
} }
_return:
if (*pJob) {
SSchJob *job = schAcquireJob(*pJob); SSchJob *job = schAcquireJob(*pJob);
pRes->code = atomic_load_32(&job->errCode); pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows; pRes->numOfRows = job->resNumOfRows;
if (SCH_RES_TYPE_QUERY == job->resType) { pRes->res = job->queryRes;
pRes->res = job->resData; job->queryRes = NULL;
job->resData = NULL;
}
schReleaseJob(*pJob); schReleaseJob(*pJob);
}
return TSDB_CODE_SUCCESS; return code;
} }
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) { int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册