提交 bc769836 编写于 作者: D dapan1121

enh: launch tables in same db

上级 bdee43e3
...@@ -242,7 +242,6 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, S ...@@ -242,7 +242,6 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, S
goto _return; goto _return;
} }
int32_t sz = 0;
pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName)); pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == pCache) { if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName); ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
...@@ -282,7 +281,6 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid, ...@@ -282,7 +281,6 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid,
goto _return; goto _return;
} }
int32_t sz = 0;
char* stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid)); char* stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
if (NULL == stName) { if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName); ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
...@@ -2152,6 +2150,7 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet ...@@ -2152,6 +2150,7 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas) { int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas) {
int32_t tbNum = taosArrayGetSize(ctx->pNames); int32_t tbNum = taosArrayGetSize(ctx->pNames);
SName* fName = taosArrayGet(ctx->pNames, 0); SName* fName = taosArrayGet(ctx->pNames, 0);
...@@ -2206,6 +2205,276 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ...@@ -2206,6 +2205,276 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas) {
int32_t tbNum = taosArrayGetSize(ctx->pNames);
int32_t fIdx = 0;
SName* pName = taosArrayGet(ctx->pNames, 0);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t flag = CTG_FLAG_UNKNOWN_STB;
uint64_t lastSuid = 0;
STableMeta* lastTableMeta = NULL;
if (IS_SYS_DBNAME(pName->dbname)) {
CTG_FLAG_SET_SYS_DB(flag);
strcpy(dbFName, pName->dbname);
} else {
tNameGetFullDbName(pName, dbFName);
}
SCtgDBCache *dbCache = NULL;
SCtgTbCache* pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
for (int32_t i = 0; i < tbNum; ++i) {
SMetaRes res = {0};
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
}
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < tbNum; ++i) {
SName* pName = taosArrayGet(ctx->pNames, i);
SMetaRes res = {0};
pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
continue;
}
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
continue;
}
STableMeta* tbMeta = pCache->pMeta;
SCtgTbMetaCtx nctx = {0};
nctx.flag = flag;
nctx.tbInfo.inCache = true;
nctx.tbInfo.dbId = dbCache->dbId;
nctx.tbInfo.suid = tbMeta->suid;
nctx.tbInfo.tbType = tbMeta->tableType;
STableMeta* pTableMeta = NULL;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
int32_t metaSize = CTG_META_SIZE(tbMeta);
pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == pTableMeta) {
//ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pTableMeta, tbMeta, metaSize);
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
res.pRes = pTableMeta;
taosArrayPush(ctx->pTbMetas, &res);
continue;
}
// PROCESS FOR CHILD TABLE
if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
cloneTableMeta(lastTableMeta, &pTableMeta);
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
res.pRes = pTableMeta;
taosArrayPush(ctx->pTbMetas, &res);
continue;
}
int32_t metaSize = sizeof(SCTableMeta);
pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == pTableMeta) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pTableMeta, tbMeta, metaSize);
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s",
pName->tname, nctx.tbInfo.tbType, dbFName);
char* stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
taosMemoryFreeClear(pTableMeta);
continue;
}
pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
if (NULL == pCache) {
ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
taosHashRelease(dbCache->stbCache, stName);
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
taosMemoryFreeClear(pTableMeta);
continue;
}
taosHashRelease(dbCache->stbCache, stName);
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
taosMemoryFreeClear(pTableMeta);
continue;
}
STableMeta* stbMeta = pCache->pMeta;
if (stbMeta->suid != nctx.tbInfo.suid) {
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, nctx.tbInfo.suid);
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
taosMemoryFreeClear(pTableMeta);
continue;
}
metaSize = CTG_META_SIZE(stbMeta);
pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
if (NULL == pTableMeta) {
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
res.pRes = pTableMeta;
taosArrayPush(ctx->pTbMetas, &res);
lastSuid = pTableMeta->suid;
lastTableMeta = pTableMeta;
}
if (NULL == ctx->pFetchs) {
TSWAP(*pTbMetas, ctx->pTbMetas);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) { int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) {
int32_t code = 0; int32_t code = 0;
......
...@@ -69,7 +69,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ...@@ -69,7 +69,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
pTask->pBatchs = pBatchs; pTask->pBatchs = pBatchs;
pTask->msgIdx = rsp.msgIdx; pTask->msgIdx = rsp.msgIdx;
ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s", pJob->queryId, pTask->taskId, pTask->msgIdx, TMSG_INFO(taskMsg.msgType + 1));
(*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); (*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
} }
......
...@@ -462,3 +462,5 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { ...@@ -462,3 +462,5 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册