From 012d216b952a7cf237bdbbe91d2e0eeb09577314 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 31 Dec 2021 18:12:45 +0800 Subject: [PATCH] [TS-983](connector,query,insert,other,tools,taosAdapter):create subtable to specific vgroups by _taos_meta_sync_table_name_taos_ --- src/common/inc/tglobal.h | 1 + src/common/src/tglobal.c | 11 ++++++++++ src/kit/shell/src/shellCheck.c | 2 +- src/mnode/inc/mnodeVgroup.h | 2 +- src/mnode/src/mnodeTable.c | 31 +++++++++++++++++++++++++-- src/mnode/src/mnodeVgroup.c | 39 +++++++++++++++++++++++++++++++++- 6 files changed, 81 insertions(+), 5 deletions(-) diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 890bed123b..4c5e78890d 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -64,6 +64,7 @@ extern int32_t tsCompressMsgSize; extern int32_t tsCompressColData; extern int32_t tsMaxNumOfDistinctResults; extern char tsTempDir[]; +extern int32_t tsMetaSyncOption; // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index b87ed82d69..5503e66b16 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -65,6 +65,7 @@ char tsLocale[TSDB_LOCALE_LEN] = {0}; char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string int8_t tsEnableCoreFile = 0; int32_t tsMaxBinaryDisplayWidth = 30; +int32_t tsMetaSyncOption = 0; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -1749,6 +1750,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_MB; taosInitConfigOption(cfg); + cfg.option = "metaSyncOption"; + cfg.ptr = &tsMetaSyncOption; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + #ifdef TD_TSZ // lossy compress cfg.option = "lossyColumns"; diff --git a/src/kit/shell/src/shellCheck.c b/src/kit/shell/src/shellCheck.c index 43256719e1..dfc5d83b9f 100644 --- a/src/kit/shell/src/shellCheck.c +++ b/src/kit/shell/src/shellCheck.c @@ -131,7 +131,7 @@ static void *shellCheckThreadFp(void *arg) { char *tbname = tbNames[t]; if (tbname == NULL) break; - snprintf(sql, SHELL_SQL_LEN, "select last_row(_c0) from %s;", tbname); + snprintf(sql, SHELL_SQL_LEN, "select count(*) from %s;", tbname); TAOS_RES *pSql = taos_query(pThread->taos, sql); int32_t code = taos_errno(pSql); diff --git a/src/mnode/inc/mnodeVgroup.h b/src/mnode/inc/mnodeVgroup.h index aff0411fdd..bda4bbf320 100644 --- a/src/mnode/inc/mnodeVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -43,7 +43,7 @@ void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_ int32_t mnodeCreateVgroup(struct SMnodeMsg *pMsg); void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle); void mnodeAlterVgroup(SVgObj *pVgroup, void *ahandle); -int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_t *sid); +int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_t *sid, int32_t vgId); int32_t mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable, bool needCheck); void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable); diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 4f277efd34..d468739e48 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -48,6 +48,11 @@ #define CREATE_CTABLE_RETRY_TIMES 10 #define CREATE_CTABLE_RETRY_SEC 14 +// informal +#define META_SYNC_TABLE_NAME "_taos_meta_sync_table_name_taos_" +#define META_SYNC_TABLE_NAME_LEN 32 +// informal + int64_t tsCTableRid = -1; static void * tsChildTableSdb; int64_t tsSTableRid = -1; @@ -1726,6 +1731,9 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, cols++; numOfRows++; + + mDebug("stable: %s, uid: %" PRIu64, prefix, pTable->uid); + mnodeDecTableRef(pTable); } @@ -2227,9 +2235,28 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { if (pMsg->pTable == NULL) { SVgObj *pVgroup = NULL; int32_t tid = 0; - code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid); + int32_t vgId = 0; + + if (tsMetaSyncOption) { + char tbName[TSDB_TABLE_NAME_LEN] = "\0"; + strncpy(tbName, pCreate->tableName, TSDB_TABLE_NAME_LEN); + char *pTbName = strtok(tbName, "."); + if (pTbName) { + pTbName = strtok(NULL, "."); + if (pTbName) { + pTbName = strtok(NULL, "."); + if (pTbName) { + if (0 == strncmp(META_SYNC_TABLE_NAME, pTbName, META_SYNC_TABLE_NAME_LEN)) { + vgId = atoi(pTbName + META_SYNC_TABLE_NAME_LEN); + } + } + } + } + } + + code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid, vgId); if (code != TSDB_CODE_SUCCESS) { - mDebug("msg:%p, app:%p table:%s, failed to get available vgroup, reason:%s", pMsg, pMsg->rpcMsg.ahandle, + mError("msg:%p, app:%p table:%s, failed to get available vgroup, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName, tstrerror(code)); return code; } diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index fd6d60c034..46dc07f0f2 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -428,10 +428,47 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) { return TSDB_CODE_SUCCESS; } -int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSid) { +int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSid, int32_t vgId) { SDbObj *pDb = pMsg->pDb; pthread_mutex_lock(&pDb->mutex); + if (vgId > 0) { + for (int32_t v = 0; v < pDb->numOfVgroups; ++v) { + SVgObj *pVgroup = pDb->vgList[v]; + if (pVgroup == NULL) { + mError("db:%s, vgroup: %d is null", pDb->name, v); + pthread_mutex_unlock(&pDb->mutex); + return TSDB_CODE_MND_APP_ERROR; + } + + if (pVgroup->vgId != (uint32_t)vgId) { // find the target vgId + continue; + } + + int32_t sid = taosAllocateId(pVgroup->idPool); + if (sid <= 0) { + int curMaxId = taosIdPoolMaxSize(pVgroup->idPool); + if ((taosUpdateIdPool(pVgroup->idPool, curMaxId + 1) < 0) || ((sid = taosAllocateId(pVgroup->idPool)) <= 0)) { + mError("msg:%p, app:%p db:%s, no enough sid in vgId:%d", pMsg, pMsg->rpcMsg.ahandle, pDb->name, + pVgroup->vgId); + pthread_mutex_unlock(&pDb->mutex); + return TSDB_CODE_MND_APP_ERROR; + } + } + mDebug("vgId:%d, alloc tid:%d", pVgroup->vgId, sid); + + *pSid = sid; + *ppVgroup = pVgroup; + pDb->vgListIndex = v; + + pthread_mutex_unlock(&pDb->mutex); + return TSDB_CODE_SUCCESS; + } + pthread_mutex_unlock(&pDb->mutex); + mError("db:%s, vgroup: %d not exist", pDb->name, vgId); + return TSDB_CODE_MND_APP_ERROR; + } + for (int32_t v = 0; v < pDb->numOfVgroups; ++v) { int vgIndex = (v + pDb->vgListIndex) % pDb->numOfVgroups; SVgObj *pVgroup = pDb->vgList[vgIndex]; -- GitLab