提交 012d216b 编写于 作者: C Cary Xu

[TS-983]<feature>(connector,query,insert,other,tools,taosAdapter):create...

[TS-983]<feature>(connector,query,insert,other,tools,taosAdapter):create subtable to specific vgroups by _taos_meta_sync_table_name_taos_
上级 5a848d8f
...@@ -64,6 +64,7 @@ extern int32_t tsCompressMsgSize; ...@@ -64,6 +64,7 @@ extern int32_t tsCompressMsgSize;
extern int32_t tsCompressColData; extern int32_t tsCompressColData;
extern int32_t tsMaxNumOfDistinctResults; extern int32_t tsMaxNumOfDistinctResults;
extern char tsTempDir[]; extern char tsTempDir[];
extern int32_t tsMetaSyncOption;
// query buffer management // query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
......
...@@ -65,6 +65,7 @@ char tsLocale[TSDB_LOCALE_LEN] = {0}; ...@@ -65,6 +65,7 @@ char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
int8_t tsEnableCoreFile = 0; int8_t tsEnableCoreFile = 0;
int32_t tsMaxBinaryDisplayWidth = 30; int32_t tsMaxBinaryDisplayWidth = 30;
int32_t tsMetaSyncOption = 0;
/* /*
* denote if the server needs to compress response message at the application layer to client, including query rsp, * denote if the server needs to compress response message at the application layer to client, including query rsp,
...@@ -1749,6 +1750,16 @@ static void doInitGlobalConfig(void) { ...@@ -1749,6 +1750,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_MB; cfg.unitType = TAOS_CFG_UTYPE_MB;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "metaSyncOption";
cfg.ptr = &tsMetaSyncOption;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy compress // lossy compress
cfg.option = "lossyColumns"; cfg.option = "lossyColumns";
......
...@@ -131,7 +131,7 @@ static void *shellCheckThreadFp(void *arg) { ...@@ -131,7 +131,7 @@ static void *shellCheckThreadFp(void *arg) {
char *tbname = tbNames[t]; char *tbname = tbNames[t];
if (tbname == NULL) break; if (tbname == NULL) break;
snprintf(sql, SHELL_SQL_LEN, "select last_row(_c0) from %s;", tbname); snprintf(sql, SHELL_SQL_LEN, "select count(*) from %s;", tbname);
TAOS_RES *pSql = taos_query(pThread->taos, sql); TAOS_RES *pSql = taos_query(pThread->taos, sql);
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
......
...@@ -43,7 +43,7 @@ void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_ ...@@ -43,7 +43,7 @@ void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_
int32_t mnodeCreateVgroup(struct SMnodeMsg *pMsg); int32_t mnodeCreateVgroup(struct SMnodeMsg *pMsg);
void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle); void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle);
void mnodeAlterVgroup(SVgObj *pVgroup, void *ahandle); void mnodeAlterVgroup(SVgObj *pVgroup, void *ahandle);
int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_t *sid); int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_t *sid, int32_t vgId);
int32_t mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable, bool needCheck); int32_t mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable, bool needCheck);
void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable); void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable);
......
...@@ -48,6 +48,11 @@ ...@@ -48,6 +48,11 @@
#define CREATE_CTABLE_RETRY_TIMES 10 #define CREATE_CTABLE_RETRY_TIMES 10
#define CREATE_CTABLE_RETRY_SEC 14 #define CREATE_CTABLE_RETRY_SEC 14
// informal
#define META_SYNC_TABLE_NAME "_taos_meta_sync_table_name_taos_"
#define META_SYNC_TABLE_NAME_LEN 32
// informal
int64_t tsCTableRid = -1; int64_t tsCTableRid = -1;
static void * tsChildTableSdb; static void * tsChildTableSdb;
int64_t tsSTableRid = -1; int64_t tsSTableRid = -1;
...@@ -1726,6 +1731,9 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1726,6 +1731,9 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
cols++; cols++;
numOfRows++; numOfRows++;
mDebug("stable: %s, uid: %" PRIu64, prefix, pTable->uid);
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
} }
...@@ -2227,9 +2235,28 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -2227,9 +2235,28 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
int32_t tid = 0; int32_t tid = 0;
code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid); int32_t vgId = 0;
if (tsMetaSyncOption) {
char tbName[TSDB_TABLE_NAME_LEN] = "\0";
strncpy(tbName, pCreate->tableName, TSDB_TABLE_NAME_LEN);
char *pTbName = strtok(tbName, ".");
if (pTbName) {
pTbName = strtok(NULL, ".");
if (pTbName) {
pTbName = strtok(NULL, ".");
if (pTbName) {
if (0 == strncmp(META_SYNC_TABLE_NAME, pTbName, META_SYNC_TABLE_NAME_LEN)) {
vgId = atoi(pTbName + META_SYNC_TABLE_NAME_LEN);
}
}
}
}
}
code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid, vgId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mDebug("msg:%p, app:%p table:%s, failed to get available vgroup, reason:%s", pMsg, pMsg->rpcMsg.ahandle, mError("msg:%p, app:%p table:%s, failed to get available vgroup, reason:%s", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName, tstrerror(code)); pCreate->tableName, tstrerror(code));
return code; return code;
} }
......
...@@ -428,10 +428,47 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) { ...@@ -428,10 +428,47 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSid) { int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSid, int32_t vgId) {
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
pthread_mutex_lock(&pDb->mutex); pthread_mutex_lock(&pDb->mutex);
if (vgId > 0) {
for (int32_t v = 0; v < pDb->numOfVgroups; ++v) {
SVgObj *pVgroup = pDb->vgList[v];
if (pVgroup == NULL) {
mError("db:%s, vgroup: %d is null", pDb->name, v);
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_MND_APP_ERROR;
}
if (pVgroup->vgId != (uint32_t)vgId) { // find the target vgId
continue;
}
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) {
int curMaxId = taosIdPoolMaxSize(pVgroup->idPool);
if ((taosUpdateIdPool(pVgroup->idPool, curMaxId + 1) < 0) || ((sid = taosAllocateId(pVgroup->idPool)) <= 0)) {
mError("msg:%p, app:%p db:%s, no enough sid in vgId:%d", pMsg, pMsg->rpcMsg.ahandle, pDb->name,
pVgroup->vgId);
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_MND_APP_ERROR;
}
}
mDebug("vgId:%d, alloc tid:%d", pVgroup->vgId, sid);
*pSid = sid;
*ppVgroup = pVgroup;
pDb->vgListIndex = v;
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_SUCCESS;
}
pthread_mutex_unlock(&pDb->mutex);
mError("db:%s, vgroup: %d not exist", pDb->name, vgId);
return TSDB_CODE_MND_APP_ERROR;
}
for (int32_t v = 0; v < pDb->numOfVgroups; ++v) { for (int32_t v = 0; v < pDb->numOfVgroups; ++v) {
int vgIndex = (v + pDb->vgListIndex) % pDb->numOfVgroups; int vgIndex = (v + pDb->vgListIndex) % pDb->numOfVgroups;
SVgObj *pVgroup = pDb->vgList[vgIndex]; SVgObj *pVgroup = pDb->vgList[vgIndex];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册