提交 732a0b2c 编写于 作者: D dapan1121

feature/scheduler

上级 9cbe40ae
......@@ -103,11 +103,10 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param forceUpdate (input, force update db vgroup info from mnode)
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList);
int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, SArray** pVgroupList);
int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo);
......
......@@ -913,12 +913,12 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (vindex < pDb->cfg.numOfVgroups) {
while (true) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) {
if (NULL == pDb || pVgroup->dbUid == pDb->uid) {
SVgroupInfo vgInfo = {0};
vgInfo.vgId = pVgroup->vgId;
vgInfo.hashBegin = pVgroup->hashBegin;
......@@ -943,6 +943,10 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
}
sdbRelease(pSdb, pVgroup);
if (pDb && (vindex >= pDb->cfg.numOfVgroups)) {
break;
}
}
sdbCancelFetch(pSdb, pIter);
......@@ -964,6 +968,20 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
char *p = strchr(usedbReq.db, '.');
if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) {
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
int32_t vgVersion = taosGetTimestampSec() / 300;
if (usedbReq.vgVersion < vgVersion) {
usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
if (usedbRsp.pVgroupInfos == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto USE_DB_OVER;
}
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
usedbRsp.vgVersion = vgVersion;
} else {
usedbRsp.vgVersion = usedbReq.vgVersion;
}
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
code = 0;
} else {
pDb = mndAcquireDb(pMnode, usedbReq.db);
......
......@@ -89,6 +89,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
SRpcMsg rpcMsg;
int msgLen = 0;
int32_t code = TSDB_CODE_VND_APP_ERROR;
char tableFName[TSDB_TABLE_FNAME_LEN];
STableInfoReq infoReq = {0};
if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
......@@ -96,6 +97,16 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
goto _exit;
}
metaRsp.dbId = pVnode->config.dbId;
memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName));
strcpy(metaRsp.tbName, infoReq.tbName);
sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName);
code = vnodeValidateTableHash(&pVnode->config, tableFName);
if (code) {
goto _exit;
}
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, infoReq.tbName, &uid);
if (pTbCfg == NULL) {
code = TSDB_CODE_VND_TB_NOT_EXIST;
......@@ -132,9 +143,6 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
goto _exit;
}
metaRsp.dbId = pVnode->config.dbId;
memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName));
strcpy(metaRsp.tbName, infoReq.tbName);
if (pTbCfg->type == META_CHILD_TABLE) {
strcpy(metaRsp.stbName, pStbCfg->name);
metaRsp.suid = pTbCfg->ctbCfg.suid;
......
......@@ -30,6 +30,7 @@ extern "C" {
#define CTG_DEFAULT_CACHE_TBLMETA_NUMBER 1000
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_DEFAULT_MAX_RETRY_TIMES 3
#define CTG_RENT_SLOT_SECOND 1.5
......
......@@ -190,7 +190,7 @@ void ctgDbgShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
dbCache = (SCtgDBCache *)pIter;
taosHashGetKey((void **)&dbFName, &len);
dbFName = taosHashGetKey(pIter, &len);
int32_t metaNum = dbCache->tbCache.metaCache ? taosHashGetSize(dbCache->tbCache.metaCache) : 0;
int32_t stbNum = dbCache->tbCache.stbCache ? taosHashGetSize(dbCache->tbCache.stbCache) : 0;
......@@ -384,6 +384,11 @@ int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId)
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
char *p = strchr(dbFName, '.');
if (p && CTG_IS_INF_DBNAME(p + 1)) {
dbFName = p + 1;
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->dbId = dbId;
......@@ -466,6 +471,11 @@ int32_t ctgPushUpdateVgMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t d
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
char *p = strchr(dbFName, '.');
if (p && CTG_IS_INF_DBNAME(p + 1)) {
dbFName = p + 1;
}
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->pCtg = pCtg;
msg->dbId = dbId;
......@@ -493,6 +503,11 @@ int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, boo
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
char *p = strchr(output->dbFName, '.');
if (p && CTG_IS_INF_DBNAME(p + 1)) {
memmove(output->dbFName, p + 1, strlen(p + 1));
}
msg->pCtg = pCtg;
msg->output = output;
......@@ -562,6 +577,11 @@ void ctgWReleaseVgInfo(SCtgDBCache *dbCache) {
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
char *p = strchr(dbFName, '.');
if (p && CTG_IS_INF_DBNAME(p + 1)) {
dbFName = p + 1;
}
SCtgDBCache *dbCache = NULL;
if (acquire) {
dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
......@@ -927,7 +947,7 @@ int32_t ctgGetTableMetaFromMnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMg
return ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, dbFName, (char *)pTableName->tname, output);
}
int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
int32_t ctgGetTableMetaFromVnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
......@@ -977,6 +997,32 @@ int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMg
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
int32_t code = 0;
int32_t retryNum = 0;
while (retryNum < CTG_DEFAULT_MAX_RETRY_TIMES) {
code = ctgGetTableMetaFromVnodeImpl(pCtg, pTrans, pMgmtEps, pTableName, vgroupInfo, output);
if (code) {
if (TSDB_CODE_VND_HASH_MISMATCH == code) {
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
code = catalogRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName);
if (code != TSDB_CODE_SUCCESS) {
break;
}
++retryNum;
continue;
}
}
break;
}
CTG_RET(code);
}
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
switch (hashMethod) {
......@@ -1344,10 +1390,6 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
ctgDebug("db added to cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
if (CTG_IS_INF_DBNAME(dbFName)) {
return TSDB_CODE_SUCCESS;
}
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbId);
......@@ -1634,13 +1676,13 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
bool inCache = false;
int32_t code = 0;
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));
if (inCache && !forceUpdate) {
if (inCache) {
return TSDB_CODE_SUCCESS;
}
......@@ -1648,13 +1690,7 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const
SBuildUseDBInput input = {0};
tstrncpy(input.db, dbFName, tListLen(input.db));
if (inCache) {
input.dbId = (*dbCache)->dbId;
input.vgVersion = (*dbCache)->vgInfo->vgVersion;
input.numOfTable = (*dbCache)->vgInfo->numOfTable;
} else {
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
}
code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut);
if (code) {
......@@ -1998,11 +2034,6 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
char *p = strchr(output->dbFName, '.');
if (p && CTG_IS_INF_DBNAME(p + 1)) {
memmove(output->dbFName, p + 1, strlen(p + 1));
}
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, output->dbFName, output->dbId);
......@@ -2185,7 +2216,7 @@ int32_t ctgGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps
tNameGetFullDbName(pTableName, db);
SHashObj *vgHash = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo));
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, &dbCache, &vgInfo));
if (dbCache) {
vgHash = dbCache->vgInfo->vgHash;
......@@ -2432,7 +2463,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) {
int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SArray** vgroupList) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
......@@ -2444,7 +2475,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, c
SArray *vgList = NULL;
SHashObj *vgHash = NULL;
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache, &vgInfo));
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, &dbCache, &vgInfo));
if (dbCache) {
vgHash = dbCache->vgInfo->vgHash;
} else {
......@@ -2478,35 +2509,14 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId
int32_t code = 0;
if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) {
ctgFreeVgInfo(dbInfo);
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
}
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG};
SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->dbId = dbId;
msg->dbInfo = dbInfo;
action.data = msg;
CTG_ERR_JRET(ctgPushAction(pCtg, &action));
dbInfo = NULL;
CTG_API_LEAVE(code);
code = ctgPushUpdateVgMsgInQueue(pCtg, dbFName, dbId, dbInfo, false);
_return:
ctgFreeVgInfo(dbInfo);
tfree(msg);
CTG_API_LEAVE(code);
}
......@@ -2681,7 +2691,7 @@ int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pM
tNameGetFullDbName(pTableName, db);
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, false, &dbCache, &vgInfo));
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, &dbCache, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup));
......
......@@ -713,7 +713,7 @@ void *ctgTestGetDbVgroupThread(void *param) {
int32_t n = 0;
while (!ctgTestStop) {
code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList);
code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, &vgList);
if (code) {
assert(0);
}
......@@ -2009,7 +2009,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList);
code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum);
......
......@@ -1115,7 +1115,7 @@ static int32_t translateShowTables(STranslateContext* pCxt) {
tNameGetFullDbName(&name, dbFname);
SArray* array = NULL;
int32_t code = catalogGetDBVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, dbFname, false, &array);
int32_t code = catalogGetDBVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, dbFname, &array);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -469,7 +469,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
if (addNum <= 0) {
SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
return TSDB_CODE_QRY_INVALID_INPUT;
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
/*
......@@ -778,8 +778,8 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode,
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved));
} else {
SCH_TASK_DLOG("task already done, no more failure process, status:%d", SCH_GET_TASK_STATUS(pTask));
return TSDB_CODE_SUCCESS;
SCH_TASK_ELOG("task not in executing list, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
......@@ -1415,6 +1415,12 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SCH_RET(atomic_load_32(&pJob->errCode));
}
// NOTE: race condition: the task should be put into the hash table before send msg to server
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
}
SSubplan *plan = pTask->plan;
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
......@@ -1429,12 +1435,6 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
// NOTE: race condition: the task should be put into the hash table before send msg to server
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
}
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册