diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a10c64c970d313f3dd28a931609164ef3e7a436c..a78ba98e83d74a16f63aa4d50eb47af404b729ee 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -190,14 +190,20 @@ typedef struct SEp { } SEp; typedef struct { + char dbFName[TSDB_DB_FNAME_LEN]; int32_t contLen; int32_t vgId; } SMsgHead; +typedef struct { + char dbFName[TSDB_DB_FNAME_LEN]; +} SRspHead; + // Submit message for one table typedef struct SSubmitBlk { int64_t uid; // table unique id int32_t tid; // table id + char tableName[TSDB_TABLE_NAME_LEN]; int32_t padding; // TODO just for padding here int32_t sversion; // data schema version int32_t dataLen; // data part length, not including the SSubmitBlk head @@ -2283,4 +2289,3 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p #endif #endif /*_TD_COMMON_TAOS_MSG_H_*/ - \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 39919bc233e5607fc47f9c091eab6a6e4e83c3ab..ba37f67676f83b7fa1ccb42ad8ec9cd885444ec0 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -968,7 +968,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { char *p = strchr(usedbReq.db, '.'); if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) { memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN); - int32_t vgVersion = taosGetTimestampSec() / 300; + static int32_t vgVersion = 1; if (usedbReq.vgVersion < vgVersion) { usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo)); if (usedbRsp.pVgroupInfos == NULL) { @@ -977,12 +977,16 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { } mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos); - usedbRsp.vgVersion = vgVersion; + usedbRsp.vgVersion = vgVersion++; + + if (taosArrayGetSize(usedbRsp.pVgroupInfos) <= 0) { + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + } } else { usedbRsp.vgVersion = usedbReq.vgVersion; + code = 0; } usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); - code = 0; } else { pDb = mndAcquireDb(pMnode, usedbReq.db); if (pDb == NULL) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4db14a2493b179c7da2b77c90b9ca1403f1a71a7..a577f4365588b44a7e79aad2c822ee4e176863f6 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5052,7 +5052,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; completed += 1; continue; } @@ -5206,8 +5206,6 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; - - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); if (pOperator->status == OP_EXEC_DONE) { qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0); @@ -5605,7 +5603,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->nextDataFn = doSysTableScan; + pOperator->getNextFn = doSysTableScan; pOperator->closeFn = destroySysTableScannerOperatorInfo; pOperator->pTaskInfo = pTaskInfo; @@ -7349,7 +7347,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo); - pOperator->nextDataFn = doProjectOperation; + pOperator->getNextFn = doProjectOperation; pOperator->pTaskInfo = pTaskInfo; pOperator->closeFn = destroyProjectOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index a546ad44a46215f65c660115d9be33244e7b6fc0..8193859c9b5f8d8a03db70a6241f14a5bd8dcd82 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -840,3 +840,4 @@ int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** nodesDestroyNode(pLogicPlan); return code; } + \ No newline at end of file diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 1a4f09d0c8a29ff074273980489e5bfa9d01a8f7..6c4c8227635ff400d5daacf99d9e7d00d6d671d2 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1371,6 +1371,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, memcpy(pMsg->msg, pJob->sql, len); memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); + break; }