提交 25963577 编写于 作者: D dapan

feature/qnode3

上级 6be54c11
......@@ -82,11 +82,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
int msgLen = 0;
int32_t code = TSDB_CODE_VND_APP_ERROR;
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", pReq->dbFName, pReq->tbName);
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, tbFName, &uid);
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tbName, &uid);
if (pTbCfg == NULL) {
code = TSDB_CODE_VND_TB_NOT_EXIST;
goto _exit;
......@@ -124,10 +120,10 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
memcpy(pTbMetaMsg->dbFName, pReq->dbFName, sizeof(pTbMetaMsg->dbFName));
strcpy(pTbMetaMsg->tbName, pReq->tbName);
if (pTbCfg->type == META_CHILD_TABLE) {
strcpy(pTbMetaMsg->stbName, pStbCfg->name + strlen(pReq->dbFName) + 1);
strcpy(pTbMetaMsg->stbName, pStbCfg->name);
pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid);
} else if (pTbCfg->type == META_SUPER_TABLE) {
strcpy(pTbMetaMsg->stbName, pTbCfg->name + strlen(pReq->dbFName) + 1);
strcpy(pTbMetaMsg->stbName, pTbCfg->name);
pTbMetaMsg->suid = htobe64(uid);
}
pTbMetaMsg->numOfTags = htonl(nTagCols);
......
......@@ -248,6 +248,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
tbMeta = *pTableMeta;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
taosHashRelease(pCatalog->dbCache, dbCache);
ctgDebug("Got tbmeta from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, db, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -257,6 +258,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &tbMeta->suid, sizeof(tbMeta->suid));
if (NULL == stbMeta || NULL == *stbMeta) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgError("stable not in stbCache, suid:%"PRIx64, tbMeta->suid);
tfree(*pTableMeta);
*exist = 0;
......@@ -265,6 +267,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
if ((*stbMeta)->suid != tbMeta->suid) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
tfree(*pTableMeta);
ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
......@@ -274,6 +277,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
*pTableMeta = realloc(*pTableMeta, metaSize);
if (NULL == *pTableMeta) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgError("realloc size[%d] failed", metaSize);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -282,6 +286,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", db, pTableName->tname);
return TSDB_CODE_SUCCESS;
......@@ -1318,6 +1324,9 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
taosHashCleanup(dbCache->vgInfo->vgHash);
dbCache->vgInfo->vgHash = NULL;
}
tfree(dbCache->vgInfo);
dbCache->vgInfo = dbInfo;
}
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
......
......@@ -128,7 +128,10 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
strcpy(sn.dbname, "db1");
strcpy(sn.tname, ctgTestSTablename);
strcpy(output->dbFName, cn.dbname);
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&cn, db);
strcpy(output->dbFName, db);
SET_META_TYPE_BOTH_TABLE(output->metaType);
strcpy(output->ctbName, cn.tname);
......@@ -171,10 +174,11 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
strcpy(s->name, "tag1s");
}
void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) {
static int32_t vgVersion = ctgTestVgVersion + 1;
int32_t vgNum = 0;
SVgroupInfo vgInfo = {0};
SDBVgroupInfo *dbVgroup = (SDBVgroupInfo *)calloc(1, sizeof(SDBVgroupInfo));
dbVgroup->vgVersion = vgVersion++;
......@@ -201,6 +205,8 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
taosHashPut(dbVgroup->vgHash, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo));
}
*pdbVgroup = dbVgroup;
}
void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
......@@ -370,7 +376,7 @@ void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg,
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont;
strcpy(rspMsg->dbFName, ctgTestDbname);
strcpy(rspMsg->tbName, ctgTestSTablename);
sprintf(rspMsg->tbName, "%s_%d", ctgTestSTablename, idx);
sprintf(rspMsg->stbName, "%s_%d", ctgTestSTablename, idx);
rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum);
......@@ -589,12 +595,12 @@ void *ctgTestGetDbVgroupThread(void *param) {
void *ctgTestSetDbVgroupThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0;
SDBVgroupInfo dbVgroup = {0};
SDBVgroupInfo *dbVgroup = NULL;
int32_t n = 0;
while (!ctgTestStop) {
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup);
if (code) {
assert(0);
}
......@@ -669,6 +675,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
return NULL;
}
TEST(tableMeta, normalTable) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -1062,9 +1069,11 @@ TEST(dbVgroup, getSetDbVgroupCase) {
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SDBVgroupInfo *dbVgroup = NULL;
SArray *vgList = NULL;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndNormalMeta();
initQueryModuleMsgHandle();
......@@ -1099,7 +1108,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
taosArrayDestroy(vgList);
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup);
ASSERT_EQ(code, 0);
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
......@@ -1169,6 +1178,7 @@ TEST(multiThread, getSetDbVgroupCase) {
catalogDestroy();
}
TEST(multiThread, ctableMeta) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -1178,6 +1188,8 @@ TEST(multiThread, ctableMeta) {
SArray *vgList = NULL;
ctgTestStop = false;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndChildMeta();
initQueryModuleMsgHandle();
......@@ -1212,11 +1224,13 @@ TEST(multiThread, ctableMeta) {
}
ctgTestStop = true;
sleep(1);
sleep(2);
catalogDestroy();
}
TEST(rentTest, allRent) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -1229,6 +1243,8 @@ TEST(rentTest, allRent) {
SSTableMetaVersion *stable = NULL;
uint32_t num = 0;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndMultiSuperMeta();
initQueryModuleMsgHandle();
......
......@@ -131,7 +131,7 @@ typedef struct SSchJob {
SQueryProfileSummary summary;
} SSchJob;
#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
......
......@@ -779,14 +779,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
pErrTask = par;
atomic_add_fetch_32(&par->childReady, 1);
int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1);
SCH_LOCK(SCH_WRITE, &par->lock);
SDownstreamSource source = {.taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr};
qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &source);
SCH_UNLOCK(SCH_WRITE, &par->lock);
if (SCH_TASK_READY_TO_LUNCH(par)) {
if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
SCH_ERR_RET(schLaunchTask(pJob, par));
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册