提交 111a06da 编写于 作者: X Xiaoyu Wang

TD-13747 merge 3.0

......@@ -140,6 +140,8 @@ typedef enum _mgmt_table {
#define TSDB_KILL_MSG_LEN 30
#define TSDB_TABLE_NUM_UNIT 100000
#define TSDB_VN_READ_ACCCESS ((char)0x1)
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS)
......@@ -169,6 +171,7 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SBuildUseDBInput;
typedef struct SField {
......@@ -570,6 +573,7 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SUseDbReq;
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
......@@ -800,8 +804,10 @@ typedef struct SVgroupInfo {
uint32_t hashBegin;
uint32_t hashEnd;
SEpSet epset;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SVgroupInfo;
typedef struct {
int32_t numOfVgroups;
SVgroupInfo vgroups[];
......
......@@ -74,9 +74,9 @@ typedef struct SDbVgVersion {
char dbFName[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SDbVgVersion;
int32_t catalogInit(SCatalogCfg *cfg);
/**
......@@ -95,7 +95,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle);
*/
void catalogFreeHandle(SCatalog* pCatalog);
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId);
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum);
/**
* Get a DB's all vgroup info.
......
......@@ -202,6 +202,7 @@ typedef struct SSubplan {
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
SQueryNodeStat execNodeStat; // only for scan subplan
SNodeList* pChildren; // the datasource subplan,from which to fetch the result
SNodeList* pParents; // the data destination subplan, get data from current subplan
SPhysiNode* pNode; // physical plan of current subplan
......
......@@ -35,7 +35,6 @@ enum {
JOB_TASK_STATUS_CANCELLING,
JOB_TASK_STATUS_CANCELLED,
JOB_TASK_STATUS_DROPPING,
JOB_TASK_STATUS_FREEING,
};
enum {
......@@ -83,6 +82,7 @@ typedef struct STableMeta {
typedef struct SDBVgInfo {
int32_t vgVersion;
int8_t hashMethod;
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
SHashObj *vgHash; //key:vgId, value:SVgroupInfo
} SDBVgInfo;
......@@ -133,6 +133,10 @@ typedef struct SQueryNodeAddr {
SEpSet epset;
} SQueryNodeAddr;
typedef struct SQueryNodeStat {
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
......
......@@ -25,6 +25,7 @@ extern "C" {
typedef struct SSchedulerCfg {
uint32_t maxJobNum;
int32_t maxNodeTableNum;
} SSchedulerCfg;
typedef struct SQueryProfileSummary {
......
......@@ -224,6 +224,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
SDbVgVersion *db = &dbs[i];
db->dbId = htobe64(db->dbId);
db->vgVersion = htonl(db->vgVersion);
db->numOfTable = htonl(db->numOfTable);
}
SKv kv = {
......
......@@ -1423,6 +1423,7 @@ int32_t tSerializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeI64(&encoder, pReq->dbId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfTable) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -1438,6 +1439,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->dbId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfTable) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
......@@ -1482,6 +1484,7 @@ static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) {
if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1;
if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pVgInfo->epset) < 0) return -1;
if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1;
}
return 0;
......@@ -1542,6 +1545,7 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) {
if (tDecodeU32(pDecoder, &vgInfo.hashBegin) < 0) return -1;
if (tDecodeU32(pDecoder, &vgInfo.hashEnd) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &vgInfo.epset) < 0) return -1;
if (tDecodeI32(pDecoder, &vgInfo.numOfTable) < 0) return -1;
taosArrayPush(pRsp->pVgroupInfos, &vgInfo);
}
......
......@@ -885,6 +885,29 @@ DROP_DB_OVER:
return code;
}
void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
int32_t vindex = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (vindex < pDb->cfg.numOfVgroups) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) {
*num += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT;
vindex++;
}
sdbRelease(pSdb, pVgroup);
}
sdbCancelFetch(pSdb, pIter);
}
static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
int32_t vindex = 0;
SSdb *pSdb = pMnode->pSdb;
......@@ -900,6 +923,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
vgInfo.vgId = pVgroup->vgId;
vgInfo.hashBegin = pVgroup->hashBegin;
vgInfo.hashEnd = pVgroup->hashEnd;
vgInfo.numOfTable = pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT;
vgInfo.epset.numOfEps = pVgroup->replica;
for (int32_t gid = 0; gid < pVgroup->replica; ++gid) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[gid];
......@@ -967,7 +991,10 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
goto USE_DB_OVER;
}
if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid) {
int32_t numOfTable = 0;
mndGetDBTableNum(pDb, pMnode, &numOfTable);
if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid || numOfTable != usedbReq.numOfTable) {
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
}
......@@ -1017,6 +1044,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
SDbVgVersion *pDbVgVersion = &pDbs[i];
pDbVgVersion->dbId = htobe64(pDbVgVersion->dbId);
pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion);
pDbVgVersion->numOfTable = htonl(pDbVgVersion->numOfTable);
SUseDbRsp usedbRsp = {0};
......@@ -1027,28 +1055,34 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
usedbRsp.uid = pDbVgVersion->dbId;
usedbRsp.vgVersion = -1;
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
} else if (pDbVgVersion->vgVersion >= pDb->vgVersion) {
mDebug("db:%s, version not changed", pDbVgVersion->dbFName);
mndReleaseDb(pMnode, pDb);
continue;
} else {
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
if (usedbRsp.pVgroupInfos == NULL) {
mndReleaseDb(pMnode, pDb);
mError("db:%s, failed to malloc usedb response", pDb->name);
continue;
}
}
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
usedbRsp.uid = pDb->uid;
usedbRsp.vgVersion = pDb->vgVersion;
usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos);
usedbRsp.hashMethod = pDb->hashMethod;
int32_t numOfTable = 0;
mndGetDBTableNum(pDb, pMnode, &numOfTable);
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) {
mDebug("db:%s, version & numOfTable not changed", pDbVgVersion->dbFName);
mndReleaseDb(pMnode, pDb);
continue;
}
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
if (usedbRsp.pVgroupInfos == NULL) {
mndReleaseDb(pMnode, pDb);
mError("db:%s, failed to malloc usedb response", pDb->name);
continue;
}
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
usedbRsp.uid = pDb->uid;
usedbRsp.vgVersion = pDb->vgVersion;
usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos);
usedbRsp.hashMethod = pDb->hashMethod;
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
mndReleaseDb(pMnode, pDb);
}
int32_t rspLen = tSerializeSUseDbBatchRsp(NULL, 0, &batchUseRsp);
......
......@@ -965,7 +965,7 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
CTG_RET(code);
}
int32_t ctgStbVersionCompare(const void* key1, const void* key2) {
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
return -1;
} else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) {
......@@ -975,7 +975,7 @@ int32_t ctgStbVersionCompare(const void* key1, const void* key2) {
}
}
int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) {
if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) {
return -1;
} else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) {
......@@ -985,6 +985,27 @@ int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
}
}
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
return -1;
} else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
return 1;
} else {
return 0;
}
}
int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) {
if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
return -1;
} else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
return 1;
} else {
return 0;
}
}
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
mgmt->slotRIdx = 0;
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
......@@ -1034,7 +1055,7 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
int16_t widx = abs(id % mgmt->slotNum);
SCtgRentSlot *slot = &mgmt->slots[widx];
......@@ -1048,12 +1069,12 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
if (slot->needSort) {
qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
taosArraySort(slot->meta, compare);
taosArraySort(slot->meta, sortCompare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
}
void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
if (NULL == orig) {
qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
......@@ -1075,8 +1096,8 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) {
int16_t widx = labs(id % mgmt->slotNum);
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
int16_t widx = abs(id % mgmt->slotNum);
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
......@@ -1088,12 +1109,12 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare)
}
if (slot->needSort) {
taosArraySort(slot->meta, compare);
taosArraySort(slot->meta, sortCompare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
}
int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ);
int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
if (idx < 0) {
qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
......@@ -1240,7 +1261,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
uint64_t *suid = NULL;
suid = taosHashGetKey(pIter, NULL);
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionCompare)) {
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
}
......@@ -1264,7 +1285,7 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionCompare));
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
......@@ -1331,7 +1352,7 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD
}
bool newAdded = false;
SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion};
SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable};
SCtgDBCache *dbCache = NULL;
CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache));
......@@ -1344,8 +1365,15 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD
CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
if (dbCache->vgInfo) {
if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) {
ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
if (dbInfo->vgVersion < dbCache->vgInfo->vgVersion) {
ctgDebug("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
ctgWReleaseVgInfo(dbCache);
return TSDB_CODE_SUCCESS;
}
if (dbInfo->vgVersion == dbCache->vgInfo->vgVersion && dbInfo->numOfTable == dbCache->vgInfo->numOfTable) {
ctgDebug("no new db vgVersion or numOfTable, dbFName:%s, vgVersion:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable);
ctgWReleaseVgInfo(dbCache);
return TSDB_CODE_SUCCESS;
......@@ -1365,7 +1393,7 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD
dbCache = NULL;
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
CTG_RET(code);
}
......@@ -1397,8 +1425,8 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionCompare);
ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare);
}
origSuid = orig->suid;
......@@ -1511,6 +1539,7 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const
if (inCache) {
input.dbId = (*dbCache)->dbId;
input.vgVersion = (*dbCache)->vgInfo->vgVersion;
input.numOfTable = (*dbCache)->vgInfo->numOfTable;
} else {
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
}
......@@ -1924,7 +1953,7 @@ int32_t ctgActRemoveStb(SCtgMetaAction *action) {
ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionCompare));
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
......@@ -2163,7 +2192,7 @@ void catalogFreeHandle(SCatalog* pCtg) {
ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
}
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId) {
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) {
......@@ -2194,6 +2223,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
*version = dbCache->vgInfo->vgVersion;
*dbId = dbCache->dbId;
*tableNum = dbCache->vgInfo->numOfTable;
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
......
......@@ -893,6 +893,8 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p
SUseDbReq usedbReq = {0};
tNameExtractFullName(&name, usedbReq.db);
catalogGetDBVgVersion(pCxt->pParseCxt->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -42,6 +42,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
for (int32_t i = 0; i < usedbRsp->vgNum; ++i) {
SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i);
pOut->dbVgroup->numOfTable += pVgInfo->numOfTable;
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -84,6 +85,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
usedbReq.db[sizeof(usedbReq.db) - 1] = 0;
usedbReq.vgVersion = pInput->vgVersion;
usedbReq.dbId = pInput->dbId;
usedbReq.numOfTable = pInput->numOfTable;
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
void *pBuf = rpcMallocCont(bufLen);
......@@ -247,7 +249,7 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) {
PROCESS_META_OVER:
if (code != 0) {
qError("failed to process table meta rsp since %s", terrstr());
qError("failed to process table meta rsp since %s", tstrerror(code));
}
tFreeSTableMetaRsp(&metaRsp);
......
......@@ -26,8 +26,10 @@ extern "C" {
#include "scheduler.h"
#include "thash.h"
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
#define SCHEDULE_DEFAULT_TASK_NUMBER 1000
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20 // unit is TSDB_TABLE_NUM_UNIT
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
......@@ -72,17 +74,27 @@ typedef struct SSchCallbackParam {
uint64_t queryId;
int64_t refId;
uint64_t taskId;
SEpSet epSet;
} SSchCallbackParam;
typedef struct SSchFlowControl {
SRWLatch lock;
bool sorted;
int32_t tableNumSum;
uint32_t execTaskNum;
SArray *taskList; // Element is SSchTask*
} SSchFlowControl;
typedef struct SSchLevel {
int32_t level;
int8_t status;
SRWLatch lock;
int32_t taskFailed;
int32_t taskSucceed;
int32_t taskNum;
int32_t taskLaunchIdx; // launch startup index
SArray *subTasks; // Element is SQueryTask
int32_t level;
int8_t status;
SRWLatch lock;
int32_t taskFailed;
int32_t taskSucceed;
int32_t taskNum;
int32_t taskLaunchedNum;
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
SArray *subTasks; // Element is SQueryTask
} SSchLevel;
typedef struct SSchTask {
......@@ -102,13 +114,14 @@ typedef struct SSchTask {
int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
void* handle; // task send handle
void* handle; // task send handle
} SSchTask;
typedef struct SSchJobAttr {
bool needFetch;
bool syncSchedule;
bool queryJob;
bool needFlowCtrl;
} SSchJobAttr;
typedef struct SSchJob {
......@@ -140,6 +153,8 @@ typedef struct SSchJob {
SQueryProfileSummary summary;
} SSchJob;
extern SSchedulerMgmt schMgmt;
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
......@@ -152,8 +167,17 @@ typedef struct SSchJob {
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job)
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epset.eps[(_addr)->epset.inUse])
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epset.inUse = ((_addr)->epset.inUse + 1) % (_addr)->epset.numOfEps)
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
......@@ -173,10 +197,19 @@ typedef struct SSchJob {
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
static int32_t schLaunchTask(SSchJob *job, SSchTask *task);
static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId);
void schFreeFlowCtrl(SSchLevel *pLevel);
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schFetchFromRemote(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "schedulerInt.h"
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
#include "tref.h"
void schFreeFlowCtrl(SSchLevel *pLevel) {
if (NULL == pLevel->flowCtrl) {
return;
}
SSchFlowControl *ctrl = NULL;
void *pIter = taosHashIterate(pLevel->flowCtrl, NULL);
while (pIter) {
ctrl = (SSchFlowControl *)pIter;
if (ctrl->taskList) {
taosArrayDestroy(ctrl->taskList);
}
pIter = taosHashIterate(pLevel->flowCtrl, pIter);
}
taosHashCleanup(pLevel->flowCtrl);
pLevel->flowCtrl = NULL;
}
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
if (!SCH_IS_QUERY_JOB(pJob)) {
SCH_JOB_DLOG("job no need flow ctrl, queryJob:%d", SCH_IS_QUERY_JOB(pJob));
return TSDB_CODE_SUCCESS;
}
int32_t sum = 0;
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
sum += pTask->plan->execNodeStat.tableNum;
}
if (sum < schMgmt.cfg.maxNodeTableNum) {
SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum);
return TSDB_CODE_SUCCESS;
}
pLevel->flowCtrl = taosHashInit(pLevel->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pLevel->flowCtrl) {
SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pLevel->taskNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_SET_JOB_NEED_FLOW_CTRL(pJob);
SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%d", sum);
return TSDB_CODE_SUCCESS;
}
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
SSchLevel *pLevel = pTask->level;
SSchFlowControl *ctrl = NULL;
int32_t code = 0;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_LOCK(SCH_WRITE, &ctrl->lock);
if (ctrl->execTaskNum <= 0) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
--ctrl->execTaskNum;
ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum;
SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
_return:
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
SCH_RET(code);
}
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
SSchLevel *pLevel = pTask->level;
int32_t code = 0;
SSchFlowControl *ctrl = NULL;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
do {
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1};
code = taosHashPut(pLevel->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl));
if (code) {
if (HASH_NODE_EXIST(code)) {
continue;
}
SCH_TASK_ELOG("taosHashPut flowCtrl failed, size:%d", (int32_t)sizeof(nctrl));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum);
*enough = true;
return TSDB_CODE_SUCCESS;
}
SCH_LOCK(SCH_WRITE, &ctrl->lock);
if (0 == ctrl->execTaskNum) {
ctrl->tableNumSum = pTask->plan->execNodeStat.tableNum;
++ctrl->execTaskNum;
*enough = true;
break;
}
int32_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum;
if (sum <= schMgmt.cfg.maxNodeTableNum) {
ctrl->tableNumSum = sum;
++ctrl->execTaskNum;
*enough = true;
break;
}
if (NULL == ctrl->taskList) {
ctrl->taskList = taosArrayInit(pLevel->taskNum, POINTER_BYTES);
if (NULL == ctrl->taskList) {
SCH_TASK_ELOG("taosArrayInit taskList failed, size:%d", (int32_t)pLevel->taskNum);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
if (NULL == taosArrayPush(ctrl->taskList, &pTask)) {
SCH_TASK_ELOG("taosArrayPush to taskList failed, size:%d", (int32_t)taosArrayGetSize(ctrl->taskList));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
*enough = false;
ctrl->sorted = false;
break;
} while (true);
_return:
SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
((*enough)?"":"NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
SCH_RET(code);
}
int32_t schTaskTableNumCompare(const void* key1, const void* key2) {
SSchTask *pTask1 = *(SSchTask **)key1;
SSchTask *pTask2 = *(SSchTask **)key2;
if (pTask1->plan->execNodeStat.tableNum < pTask2->plan->execNodeStat.tableNum) {
return 1;
} else if (pTask1->plan->execNodeStat.tableNum > pTask2->plan->execNodeStat.tableNum) {
return -1;
} else {
return 0;
}
}
int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
SCH_LOCK(SCH_WRITE, &ctrl->lock);
if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) {
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
return TSDB_CODE_SUCCESS;
}
int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum;
int32_t taskNum = taosArrayGetSize(ctrl->taskList);
int32_t code = 0;
SSchTask *pTask = NULL;
if (taskNum > 1 && !ctrl->sorted) {
taosArraySort(ctrl->taskList, schTaskTableNumCompare); // desc order
}
for (int32_t i = 0; i < taskNum; ++i) {
pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i);
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) {
SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
continue;
}
ctrl->tableNumSum += pTask->plan->execNodeStat.tableNum;
++ctrl->execTaskNum;
taosArrayRemove(ctrl->taskList, i);
SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
remainNum -= pTask->plan->execNodeStat.tableNum;
if (remainNum <= 0) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum);
break;
}
if (i < (taskNum - 1)) {
SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
if (remainNum < pLastTask->plan->execNodeStat.tableNum) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d",
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum);
break;
}
}
--i;
--taskNum;
}
_return:
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
if (code) {
code = schProcessOnTaskFailure(pJob, pTask, code);
}
SCH_RET(code);
}
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
return TSDB_CODE_SUCCESS;
}
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
SSchLevel *pLevel = pTask->level;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_ERR_RET(schLaunchTasksInFlowCtrlListImpl(pJob, ctrl));
}
......@@ -125,6 +125,9 @@ void schtBuildQueryDag(SQueryPlan *dag) {
mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
mergePlan->msgType = TDMT_VND_QUERY;
merge->pNodeList = nodesMakeList();
scan->pNodeList = nodesMakeList();
nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
nodesListAppend(scan->pNodeList, (SNode*)scanPlan);
......@@ -135,6 +138,68 @@ void schtBuildQueryDag(SQueryPlan *dag) {
nodesListAppend(dag->pSubplans, (SNode*)scan);
}
void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
uint64_t qId = schtQueryId;
int32_t scanPlanNum = 20;
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList();
SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
SSubplan *scanPlan = (SSubplan *)calloc(scanPlanNum, sizeof(SSubplan));
SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));
merge->pNodeList = nodesMakeList();
scan->pNodeList = nodesMakeList();
mergePlan->pChildren = nodesMakeList();
for (int32_t i = 0; i < scanPlanNum; ++i) {
scanPlan[i].id.queryId = qId;
scanPlan[i].id.templateId = 0x0000000000000002;
scanPlan[i].id.subplanId = 0x0000000000000003 + i;
scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN;
scanPlan[i].execNode.nodeId = 1 + i;
scanPlan[i].execNode.epset.inUse = 0;
scanPlan[i].execNodeStat.tableNum = rand() % 30;
addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep0", 6030);
addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep1", 6030);
addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep2", 6030);
scanPlan[i].execNode.epset.inUse = rand() % 3;
scanPlan[i].pChildren = NULL;
scanPlan[i].level = 1;
scanPlan[i].pParents = nodesMakeList();
scanPlan[i].pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
scanPlan[i].msgType = TDMT_VND_QUERY;
nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan);
nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i));
nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i));
}
mergePlan->id.queryId = qId;
mergePlan->id.templateId = schtMergeTemplateId;
mergePlan->id.subplanId = 0x5555;
mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
mergePlan->level = 0;
mergePlan->execNode.epset.numOfEps = 0;
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
mergePlan->msgType = TDMT_VND_QUERY;
nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
nodesListAppend(dag->pSubplans, (SNode*)merge);
nodesListAppend(dag->pSubplans, (SNode*)scan);
}
void schtFreeQueryDag(SQueryPlan *dag) {
}
......@@ -182,6 +247,8 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan[1].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode));
insertPlan[1].msgType = TDMT_VND_SUBMIT;
inserta->pNodeList = nodesMakeList();
nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
insertPlan += 1;
nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
......@@ -547,11 +614,9 @@ TEST(queryTest, normalCase) {
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
int64_t job = 0;
SQueryPlan dag;
memset(&dag, 0, sizeof(dag));
schtInitLogFile();
memset(&dag, 0, sizeof(dag));
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
......@@ -648,6 +713,101 @@ TEST(queryTest, normalCase) {
schedulerDestroy();
}
TEST(queryTest, flowCtrlCase) {
void *mockPointer = (void *)0x1;
char *clusterId = "cluster1";
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
int64_t job = 0;
SQueryPlan dag;
schtInitLogFile();
srand(time(NULL));
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SEp qnodeAddr = {0};
strcpy(qnodeAddr.fqdn, "qnode0.ep");
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
int32_t code = schedulerInit(NULL);
ASSERT_EQ(code, 0);
schtBuildQueryFlowCtrlDag(&dag);
schtSetPlanToString();
schtSetExecNode();
schtSetAsyncSendMsgToServer();
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
ASSERT_EQ(code, 0);
SSchJob *pJob = schAcquireJob(job);
bool queryDone = false;
while (!queryDone) {
void *pIter = taosHashIterate(pJob->execTasks, NULL);
if (NULL == pIter) {
break;
}
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
taosHashCancelIterate(pJob->execTasks, pIter);
if (task->lastMsgType == TDMT_VND_QUERY) {
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
} else if (task->lastMsgType == TDMT_VND_RES_READY) {
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
} else {
queryDone = true;
break;
}
pIter = NULL;
}
}
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t thread1;
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL;
code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
ASSERT_EQ(pRsp->completed, 1);
ASSERT_EQ(pRsp->numOfRows, 10);
tfree(data);
data = NULL;
code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL);
schReleaseJob(job);
schedulerFreeJob(job);
schtFreeQueryDag(&dag);
schedulerDestroy();
}
TEST(insertTest, normalCase) {
......@@ -659,8 +819,6 @@ TEST(insertTest, normalCase) {
SQueryPlan dag;
uint64_t numOfRows = 0;
schtInitLogFile();
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SEp qnodeAddr = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册