未验证 提交 d32d44f1 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9609 from taosdata/feature/qnode

Feature/qnode
...@@ -50,6 +50,7 @@ typedef struct SCatalogCfg { ...@@ -50,6 +50,7 @@ typedef struct SCatalogCfg {
uint32_t maxDBCacheNum; uint32_t maxDBCacheNum;
} SCatalogCfg; } SCatalogCfg;
int32_t catalogInit(SCatalogCfg *cfg); int32_t catalogInit(SCatalogCfg *cfg);
/** /**
...@@ -87,15 +88,28 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB ...@@ -87,15 +88,28 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
*/ */
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
/**
* Get a super table's meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
/** /**
* Force renew a table's local cached meta data. * Force renew a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle) * @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object) * @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs) * @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name) * @param pTableName (input, table name, NOT including db name)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code * @return error code
*/ */
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName); int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable);
/** /**
* Force renew a table's local cached meta data and get the new one. * Force renew a table's local cached meta data and get the new one.
...@@ -104,9 +118,10 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co ...@@ -104,9 +118,10 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co
* @param pMgmtEps (input, mnode EPs) * @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name) * @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller) * @param pTableMeta(output, table meta data, NEED to free it by calller)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code * @return error code
*/ */
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
/** /**
......
...@@ -64,6 +64,14 @@ typedef struct SCatalogMgmt { ...@@ -64,6 +64,14 @@ typedef struct SCatalogMgmt {
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_IS_STABLE(isSTable) (1 == (isSTable))
#define CTG_IS_NOT_STABLE(isSTable) (0 == (isSTable))
#define CTG_IS_UNKNOWN_STABLE(isSTable) ((isSTable) < 0)
#define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0)
#define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_TABLE_NOT_EXIST(code) (code == TSDB_CODE_TDB_INVALID_TABLE_ID)
#define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0)
......
...@@ -105,6 +105,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN ...@@ -105,6 +105,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
} }
*exist = 1; *exist = 1;
tbMeta = *pTableMeta;
if (tbMeta->tableType != TSDB_CHILD_TABLE) { if (tbMeta->tableType != TSDB_CHILD_TABLE) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -143,6 +145,29 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN ...@@ -143,6 +145,29 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableName, int32_t *tbType) {
if (NULL == pCatalog->tableCache.cache) {
return TSDB_CODE_SUCCESS;
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
size_t sz = 0;
STableMeta *pTableMeta = NULL;
taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)&pTableMeta, &sz);
if (NULL == pTableMeta) {
return TSDB_CODE_SUCCESS;
}
*tbType = pTableMeta->tableType;
return TSDB_CODE_SUCCESS;
}
void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
epSet->inUse = 0; epSet->inUse = 0;
epSet->numOfEps = vgroupInfo->numOfEps; epSet->numOfEps = vgroupInfo->numOfEps;
...@@ -153,14 +178,7 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { ...@@ -153,14 +178,7 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
} }
} }
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) { int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == output) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName}; SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
char *msg = NULL; char *msg = NULL;
SEpSet *pVnodeEpSet = NULL; SEpSet *pVnodeEpSet = NULL;
...@@ -179,6 +197,12 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE ...@@ -179,6 +197,12 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
output->metaNum = 0;
ctgDebug("tbmeta:%s not exist in mnode", tbFullName);
return TSDB_CODE_SUCCESS;
}
ctgError("error rsp for table meta, code:%x", rpcRsp.code); ctgError("error rsp for table meta, code:%x", rpcRsp.code);
CTG_ERR_RET(rpcRsp.code); CTG_ERR_RET(rpcRsp.code);
} }
...@@ -188,6 +212,13 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE ...@@ -188,6 +212,13 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
return ctgGetTableMetaFromMnodeImpl(pCatalog, pRpc, pMgmtEps, tbFullName, output);
}
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
...@@ -197,7 +228,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE ...@@ -197,7 +228,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
char dbFullName[TSDB_DB_FNAME_LEN]; char dbFullName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFullName); tNameGetFullDbName(pTableName, dbFullName);
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = pTableName->tname}; SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)pTableName->tname};
char *msg = NULL; char *msg = NULL;
SEpSet *pVnodeEpSet = NULL; SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
...@@ -218,6 +249,12 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE ...@@ -218,6 +249,12 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp); rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
output->metaNum = 0;
ctgDebug("tbmeta:%s not exist in vnode", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
ctgError("error rsp for table meta, code:%x", rpcRsp.code); ctgError("error rsp for table meta, code:%x", rpcRsp.code);
CTG_ERR_RET(rpcRsp.code); CTG_ERR_RET(rpcRsp.code);
} }
...@@ -322,22 +359,28 @@ _return: ...@@ -322,22 +359,28 @@ _return:
CTG_RET(TSDB_CODE_SUCCESS); CTG_RET(TSDB_CODE_SUCCESS);
} }
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta) { int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
int32_t exist = 0; int32_t exist = 0;
if (!forceUpdate) { if (!forceUpdate) {
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
if (exist) { if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else if (CTG_IS_UNKNOWN_STABLE(isSTable)) {
int32_t tbType = 0;
CTG_ERR_RET(ctgGetTableTypeFromCache(pCatalog, pTableName, &tbType));
CTG_SET_STABLE(isSTable, tbType);
} }
CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pTableName)); CTG_ERR_RET(ctgRenewTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, isSTable));
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
...@@ -364,19 +407,27 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out ...@@ -364,19 +407,27 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
} }
if (NULL == pCatalog->tableCache.cache) { if (NULL == pCatalog->tableCache.cache) {
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.cache) { if (NULL == cache) {
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.cache, NULL, cache)) {
taosHashCleanup(cache);
}
} }
if (NULL == pCatalog->tableCache.stableCache) { if (NULL == pCatalog->tableCache.stableCache) {
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.stableCache) { if (NULL == cache) {
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.stableCache, NULL, cache)) {
taosHashCleanup(cache);
}
} }
if (output->metaNum == 2) { if (output->metaNum == 2) {
...@@ -481,6 +532,50 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD ...@@ -481,6 +532,50 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
SVgroupInfo vgroupInfo = {0};
int32_t code = 0;
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
STableMetaOutput voutput = {0};
STableMetaOutput moutput = {0};
STableMetaOutput *output = &voutput;
if (CTG_IS_STABLE(isSTable)) {
CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput));
if (0 == moutput.metaNum) {
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
} else {
output = &moutput;
}
} else {
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
if (voutput.metaNum > 0 && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) {
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
tfree(voutput.tbMeta);
voutput.tbMeta = moutput.tbMeta;
moutput.tbMeta = NULL;
}
}
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, output));
_return:
tfree(voutput.tbMeta);
tfree(moutput.tbMeta);
CTG_RET(code);
}
int32_t catalogInit(SCatalogCfg *cfg) { int32_t catalogInit(SCatalogCfg *cfg) {
if (ctgMgmt.pCluster) { if (ctgMgmt.pCluster) {
...@@ -643,11 +738,15 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB ...@@ -643,11 +738,15 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
} }
if (NULL == pCatalog->dbCache.cache) { if (NULL == pCatalog->dbCache.cache) {
pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->dbCache.cache) { if (NULL == cache) {
ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
} }
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache.cache, NULL, cache)) {
taosHashCleanup(cache);
}
} else { } else {
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
} }
...@@ -672,34 +771,23 @@ _return: ...@@ -672,34 +771,23 @@ _return:
} }
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta); return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1);
} }
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName) { int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1);
}
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
SVgroupInfo vgroupInfo = {0}; return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable);
int32_t code = 0;
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
STableMetaOutput output = {0};
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output));
//CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
_return:
tfree(output.tbMeta);
CTG_RET(code);
} }
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta); return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable);
} }
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) { int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
......
...@@ -45,7 +45,7 @@ void ctgTestSetPrepareSTableMeta(); ...@@ -45,7 +45,7 @@ void ctgTestSetPrepareSTableMeta();
bool ctgTestStop = false; bool ctgTestStop = false;
bool ctgTestEnableSleep = false; bool ctgTestEnableSleep = false;
bool ctgTestDeadLoop = true; bool ctgTestDeadLoop = false;
int32_t ctgTestCurrentVgVersion = 0; int32_t ctgTestCurrentVgVersion = 0;
int32_t ctgTestVgVersion = 1; int32_t ctgTestVgVersion = 1;
...@@ -600,7 +600,6 @@ void *ctgTestSetCtableMetaThread(void *param) { ...@@ -600,7 +600,6 @@ void *ctgTestSetCtableMetaThread(void *param) {
} }
#if 0
TEST(tableMeta, normalTable) { TEST(tableMeta, normalTable) {
struct SCatalog* pCtg = NULL; struct SCatalog* pCtg = NULL;
...@@ -768,7 +767,7 @@ TEST(tableMeta, superTableCase) { ...@@ -768,7 +767,7 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
tableMeta = NULL; tableMeta = NULL;
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(tableMeta->vgId, 9); ASSERT_EQ(tableMeta->vgId, 9);
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
...@@ -999,8 +998,6 @@ TEST(multiThread, getSetDbVgroupCase) { ...@@ -999,8 +998,6 @@ TEST(multiThread, getSetDbVgroupCase) {
catalogDestroy(); catalogDestroy();
} }
#endif
TEST(multiThread, ctableMeta) { TEST(multiThread, ctableMeta) {
struct SCatalog* pCtg = NULL; struct SCatalog* pCtg = NULL;
......
...@@ -114,8 +114,9 @@ typedef struct SSchJob { ...@@ -114,8 +114,9 @@ typedef struct SSchJob {
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_JOB_ELOG(param, ...) qError("QID:% "PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) #define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
......
...@@ -270,6 +270,8 @@ int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { ...@@ -270,6 +270,8 @@ int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_TASK_DLOG("push to %s list", "execTasks");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -284,6 +286,8 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { ...@@ -284,6 +286,8 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_TASK_DLOG("push to %s list", "succTasks");
*moved = true; *moved = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -299,6 +303,8 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { ...@@ -299,6 +303,8 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_TASK_DLOG("push to %s list", "failTasks");
*moved = true; *moved = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -372,7 +378,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { ...@@ -372,7 +378,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
if (!moved) { if (!moved) {
SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status); SCH_TASK_ELOG("task may already moved, status:%d", task->status);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -447,11 +453,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { ...@@ -447,11 +453,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
if (!needRetry) { if (!needRetry) {
SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); SCH_TASK_ELOG("task failed[%x], no more retry", errCode);
SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
if (!moved) { if (!moved) {
SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); SCH_TASK_ELOG("task may already moved, status:%d", task->status);
} }
if (SCH_TASK_NEED_WAIT_ALL(task)) { if (SCH_TASK_NEED_WAIT_ALL(task)) {
...@@ -491,7 +497,6 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms ...@@ -491,7 +497,6 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
goto _task_error; goto _task_error;
} }
} }
break; break;
} }
case TDMT_VND_SUBMIT_RSP: { case TDMT_VND_SUBMIT_RSP: {
...@@ -568,19 +573,25 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in ...@@ -568,19 +573,25 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
int32_t code = 0; int32_t code = 0;
SSchCallbackParam *pParam = (SSchCallbackParam *)param; SSchCallbackParam *pParam = (SSchCallbackParam *)param;
SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); SSchJob **pjob = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
if (NULL == job || NULL == (*job)) { if (NULL == pjob || NULL == (*pjob)) {
qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId); qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); SSchJob *job = *pjob;
if (NULL == task || NULL == (*task)) {
SSchTask **ptask = taosHashGet(job->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == ptask || NULL == (*ptask)) {
qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId); qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
SSchTask *task = *ptask;
SCH_TASK_DLOG("Got msg:%d, rspCode:%d", msgType, rspCode);
schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode); schProcessRspMsg(job, task, msgType, pMsg->pData, pMsg->len, rspCode);
_return: _return:
tfree(param); tfree(param);
...@@ -810,7 +821,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { ...@@ -810,7 +821,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(schSetTaskCondidateAddrs(job, task)); SCH_ERR_RET(schSetTaskCondidateAddrs(job, task));
if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) { if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) {
SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); SCH_TASK_ELOG("no valid condidate node for task:%"PRIx64, task->taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册