提交 503a6110 编写于 作者: H Haojun Liao

[td-5250]<enhance>: improve the performance of multi-table meta bulk load.

上级 eabbe88f
...@@ -310,7 +310,7 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); ...@@ -310,7 +310,7 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet); int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp); int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp, bool metaClone);
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray); int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray);
......
...@@ -7738,7 +7738,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -7738,7 +7738,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// load the table meta for a given table name list // load the table meta for a given table name list
if (taosArrayGetSize(plist) > 0 || taosArrayGetSize(pVgroupList) > 0) { if (taosArrayGetSize(plist) > 0 || taosArrayGetSize(pVgroupList) > 0) {
code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList, tscTableMetaCallBack); code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList, tscTableMetaCallBack, true);
} }
_end: _end:
......
...@@ -1873,7 +1873,6 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) { ...@@ -1873,7 +1873,6 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) {
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId); pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
pMetaMsg->uid = htobe64(pMetaMsg->uid); pMetaMsg->uid = htobe64(pMetaMsg->uid);
// pMetaMsg->contLen = htonl(pMetaMsg->contLen);
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) && if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
...@@ -1938,13 +1937,14 @@ static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMet ...@@ -1938,13 +1937,14 @@ static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMet
int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN); int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
// The super tableMeta already exists, create it according to tableMeta and add it to hash map // The super tableMeta already exists, create it according to tableMeta and add it to hash map
STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg); if (updateSTable) {
STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg);
uint32_t size = tscGetTableMetaSize(pSupTableMeta);
int32_t code = taosHashPut(tscTableMetaInfo, pTableMeta->sTableName, len, pSupTableMeta, size);
assert(code == TSDB_CODE_SUCCESS);
uint32_t size = tscGetTableMetaSize(pSupTableMeta); tfree(pSupTableMeta);
int32_t code = taosHashPut(tscTableMetaInfo, pTableMeta->sTableName, len, pSupTableMeta, size); }
assert(code == TSDB_CODE_SUCCESS);
tfree(pSupTableMeta);
CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta); CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta);
taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta)); taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta));
...@@ -2049,10 +2049,10 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2049,10 +2049,10 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
} }
SSqlCmd *pParentCmd = &pParentSql->cmd; SSqlCmd *pParentCmd = &pParentSql->cmd;
SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); SHashObj *pSet = taosHashInit(pMultiMeta->numOfVgroup, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
char* pMsg = pMultiMeta->meta;
char* buf = NULL; char* buf = NULL;
char* pMsg = pMultiMeta->meta;
if (pMultiMeta->compressed) { if (pMultiMeta->compressed) {
buf = malloc(pMultiMeta->rawLen - sizeof(SMultiTableMeta)); buf = malloc(pMultiMeta->rawLen - sizeof(SMultiTableMeta));
int32_t len = tsDecompressString(pMultiMeta->meta, pMultiMeta->contLen - sizeof(SMultiTableMeta), 1, int32_t len = tsDecompressString(pMultiMeta->meta, pMultiMeta->contLen - sizeof(SMultiTableMeta), 1,
...@@ -2086,22 +2086,25 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2086,22 +2086,25 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
SName sn = {0}; SName sn = {0};
tNameFromString(&sn, pMetaMsg->tableFname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&sn, pMetaMsg->tableFname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
const char* tableName = tNameGetTableName(&sn); if (pMultiMeta->metaClone == 1 || pTableMeta->tableType == TSDB_SUPER_TABLE) {
size_t keyLen = strlen(tableName); STableMetaVgroupInfo p = {.pTableMeta = pTableMeta,};
STableMetaVgroupInfo p = {.pTableMeta = pTableMeta,}; const char* tableName = tNameGetTableName(&sn);
taosHashPut(pParentCmd->pTableMetaMap, tableName, keyLen, &p, sizeof(STableMetaVgroupInfo)); size_t keyLen = strlen(tableName);
taosHashPut(pParentCmd->pTableMetaMap, tableName, keyLen, &p, sizeof(STableMetaVgroupInfo));
}
bool addToBuf = false; // for each super table, only update meta information once
if (taosHashGet(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid)) == NULL) { bool updateStableMeta = false;
addToBuf = true; if (pTableMeta->tableType == TSDB_CHILD_TABLE && taosHashGet(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid)) == NULL) {
taosHashPut(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid), "", 0); updateStableMeta = true;
taosHashPut(pSet, &pTableMeta->suid, sizeof(pMetaMsg->suid), "", 0);
} }
// create the tableMeta and add it into the TableMeta map // create the tableMeta and add it into the TableMeta map
doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, addToBuf); doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, updateStableMeta);
// if the vgroup is not updated in current process, update it. // for each vgroup, only update the information once.
int64_t vgId = pMetaMsg->vgroup.vgId; int64_t vgId = pMetaMsg->vgroup.vgId;
if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) { if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) {
doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup); doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup);
...@@ -2513,7 +2516,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn ...@@ -2513,7 +2516,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
return code; return code;
} }
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp) { int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp, bool metaClone) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
if (NULL == pNew) { if (NULL == pNew) {
tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self); tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self);
...@@ -2535,6 +2538,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg ...@@ -2535,6 +2538,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
} }
SMultiTableInfoMsg* pInfo = (SMultiTableInfoMsg*) pNew->cmd.payload; SMultiTableInfoMsg* pInfo = (SMultiTableInfoMsg*) pNew->cmd.payload;
pInfo->metaClone = metaClone? 1:0;
pInfo->numOfTables = htonl((uint32_t) taosArrayGetSize(pNameList)); pInfo->numOfTables = htonl((uint32_t) taosArrayGetSize(pNameList));
pInfo->numOfVgroups = htonl((uint32_t) taosArrayGetSize(pVgroupNameList)); pInfo->numOfVgroups = htonl((uint32_t) taosArrayGetSize(pVgroupNameList));
......
...@@ -948,7 +948,6 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -948,7 +948,6 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos; pSql->pTscObj = taos;
pSql->signature = pSql; pSql->signature = pSql;
pSql->cmd.pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
int32_t length = (int32_t)strlen(tableNameList); int32_t length = (int32_t)strlen(tableNameList);
if (length > MAX_TABLE_NAME_LENGTH) { if (length > MAX_TABLE_NAME_LENGTH) {
...@@ -967,6 +966,11 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -967,6 +966,11 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
strtolower(str, tableNameList); strtolower(str, tableNameList);
SArray* plist = taosArrayInit(4, POINTER_BYTES); SArray* plist = taosArrayInit(4, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(4, POINTER_BYTES); SArray* vgroupList = taosArrayInit(4, POINTER_BYTES);
if (plist == NULL || vgroupList == NULL) {
tfree(str);
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist); int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist);
free(str); free(str);
...@@ -976,10 +980,11 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -976,10 +980,11 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return code; return code;
} }
pSql->cmd.pTableMetaMap = taosHashInit(taosArrayGetSize(plist), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
registerSqlObj(pSql); registerSqlObj(pSql);
tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj); tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, loadMultiTableMetaCallback); code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, loadMultiTableMetaCallback, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
} }
......
...@@ -710,6 +710,7 @@ typedef struct { ...@@ -710,6 +710,7 @@ typedef struct {
} STableInfoMsg; } STableInfoMsg;
typedef struct { typedef struct {
uint8_t metaClone; // create local clone of the cached table meta
int32_t numOfVgroups; int32_t numOfVgroups;
int32_t numOfTables; int32_t numOfTables;
char tableNames[]; char tableNames[];
...@@ -761,10 +762,11 @@ typedef struct STableMetaMsg { ...@@ -761,10 +762,11 @@ typedef struct STableMetaMsg {
typedef struct SMultiTableMeta { typedef struct SMultiTableMeta {
int32_t numOfTables; int32_t numOfTables;
int32_t numOfVgroup; uint8_t metaClone; // make meta clone after retrieve meta from mnode
uint32_t contLen:31; uint8_t compressed; // denote if compressed or not
uint8_t compressed:1; // denote if compressed or not int32_t numOfVgroup; // number of tables
uint32_t rawLen; // size before compress uint32_t contLen; // current content length
uint32_t rawLen; // size before compressed
char meta[]; char meta[];
} SMultiTableMeta; } SMultiTableMeta;
......
...@@ -3036,6 +3036,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3036,6 +3036,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
int32_t len = tsCompressString(pMultiMeta->meta, (int32_t)pMultiMeta->contLen - sizeof(SMultiTableMeta), 1, int32_t len = tsCompressString(pMultiMeta->meta, (int32_t)pMultiMeta->contLen - sizeof(SMultiTableMeta), 1,
tmp + sizeof(SMultiTableMeta), (int32_t)pMultiMeta->contLen - sizeof(SMultiTableMeta) + 2, ONE_STAGE_COMP, NULL, 0); tmp + sizeof(SMultiTableMeta), (int32_t)pMultiMeta->contLen - sizeof(SMultiTableMeta) + 2, ONE_STAGE_COMP, NULL, 0);
pMultiMeta->metaClone = pInfo->metaClone;
pMultiMeta->rawLen = pMultiMeta->contLen; pMultiMeta->rawLen = pMultiMeta->contLen;
if (len == -1 || len + sizeof(SMultiTableMeta) >= pMultiMeta->contLen + 2) { // compress failed, do not compress this binary data if (len == -1 || len + sizeof(SMultiTableMeta) >= pMultiMeta->contLen + 2) { // compress failed, do not compress this binary data
pMultiMeta->compressed = 0; pMultiMeta->compressed = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册