提交 d7fe6bb9 编写于 作者: K kailixu

enh: suuport create child/normal table by specified uid/tid

上级 7427a37f
...@@ -232,7 +232,7 @@ int32_t tsKeepTimeOffset = 0; ...@@ -232,7 +232,7 @@ int32_t tsKeepTimeOffset = 0;
int32_t tsDiskCfgNum = 0; int32_t tsDiskCfgNum = 0;
int32_t tsTopicBianryLen = 16000; int32_t tsTopicBianryLen = 16000;
int32_t tsMetaSyncOption = 1; int32_t tsMetaSyncOption = 0;
#ifndef _STORAGE #ifndef _STORAGE
SDiskCfg tsDiskCfg[1]; SDiskCfg tsDiskCfg[1];
......
...@@ -223,6 +223,8 @@ int32_t* taosGetErrno(); ...@@ -223,6 +223,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0395) //"Topic already exists) #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0395) //"Topic already exists)
#define TSDB_CODE_MND_INVALID_FORMAT TAOS_DEF_ERROR_CODE(0, 0x0396) //"Invalid format) #define TSDB_CODE_MND_INVALID_FORMAT TAOS_DEF_ERROR_CODE(0, 0x0396) //"Invalid format)
#define TSDB_CODE_MND_DUP_TID TAOS_DEF_ERROR_CODE(0, 0x0397) //"Duplicated tid)
#define TSDB_CODE_MND_ID_POOL_IS_FULL TAOS_DEF_ERROR_CODE(0, 0x0398) //"Id pool is full)
// dnode // dnode
#define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed" #define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed"
......
...@@ -64,6 +64,13 @@ static SHashObj *tsSTableUidHash; ...@@ -64,6 +64,13 @@ static SHashObj *tsSTableUidHash;
static int32_t tsChildTableUpdateSize; static int32_t tsChildTableUpdateSize;
static int32_t tsSuperTableUpdateSize; static int32_t tsSuperTableUpdateSize;
typedef struct {
int32_t vgId;
int32_t tid;
uint64_t uid;
uint64_t suid;
} SMetaInfo;
static void *mnodeGetChildTable(char *tableId); static void *mnodeGetChildTable(char *tableId);
static void *mnodeGetSuperTable(char *tableId); static void *mnodeGetSuperTable(char *tableId);
static void *mnodeGetSuperTableByUid(uint64_t uid); static void *mnodeGetSuperTableByUid(uint64_t uid);
...@@ -82,7 +89,7 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg); ...@@ -82,7 +89,7 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg);
static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg); static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessMetaSyncCreateChildTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessMetaSyncCreateChildTableMsg(SMnodeMsg *pMsg, SMetaInfo *pInf);
static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg);
...@@ -2164,7 +2171,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -2164,7 +2171,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
} }
} }
static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, SMetaInfo *pInf) {
SVgObj *pVgroup = pMsg->pVgroup; SVgObj *pVgroup = pMsg->pVgroup;
SCMCreateTableMsg *p1 = pMsg->rpcMsg.pCont; SCMCreateTableMsg *p1 = pMsg->rpcMsg.pCont;
...@@ -2179,7 +2186,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -2179,7 +2186,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
pTable->info.type = (pCreate->numOfColumns == 0) ? TSDB_CHILD_TABLE : TSDB_NORMAL_TABLE; pTable->info.type = (pCreate->numOfColumns == 0) ? TSDB_CHILD_TABLE : TSDB_NORMAL_TABLE;
pTable->info.tableId = strdup(pCreate->tableName); pTable->info.tableId = strdup(pCreate->tableName);
pTable->createdTime = taosGetTimestampMs(); pTable->createdTime = taosGetTimestampMs();
pTable->tid = tid; pTable->tid = pInf->tid;
pTable->vgId = pVgroup->vgId; pTable->vgId = pVgroup->vgId;
if (pTable->info.type == TSDB_CHILD_TABLE) { if (pTable->info.type == TSDB_CHILD_TABLE) {
...@@ -2207,13 +2214,21 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -2207,13 +2214,21 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
} }
pTable->suid = pMsg->pSTable->uid; pTable->suid = pMsg->pSTable->uid;
pTable->uid = mnodeCreateTableUid(pTable->vgId, pTable->tid); if (tsMetaSyncOption && pInf->uid != (uint64_t)-1) {
pTable->uid = pInf->uid;
} else {
pTable->uid = mnodeCreateTableUid(pTable->vgId, pTable->tid);
}
pTable->superTable = pMsg->pSTable; pTable->superTable = pMsg->pSTable;
} else { } else {
if (pTable->info.type == TSDB_SUPER_TABLE) { if (pTable->info.type == TSDB_SUPER_TABLE) {
pTable->uid = mnodeCreateSuperTableUid(); pTable->uid = mnodeCreateSuperTableUid();
} else { } else {
pTable->uid = mnodeCreateTableUid(pTable->vgId, pTable->tid); if (tsMetaSyncOption && pInf->uid != (uint64_t)-1) {
pTable->uid = pInf->uid;
} else {
pTable->uid = mnodeCreateTableUid(pTable->vgId, pTable->tid);
}
} }
pTable->sversion = 0; pTable->sversion = 0;
...@@ -2266,57 +2281,69 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -2266,57 +2281,69 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
mError("msg:%p, app:%p table:%s, failed to create, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName, mError("msg:%p, app:%p table:%s, failed to create, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName,
tstrerror(code)); tstrerror(code));
} else { } else {
mDebug("msg:%p, app:%p table:%s, allocated in vgroup, vgId:%d sid:%d uid:%" PRIu64, pMsg, pMsg->rpcMsg.ahandle, mInfo("msg:%p, app:%p table:%s, allocated in vgroup, vgId:%d sid:%d uid:%" PRIu64, pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, pVgroup->vgId, pTable->tid, pTable->uid); pTable->info.tableId, pVgroup->vgId, pTable->tid, pTable->uid);
} }
return code; return code;
} }
static int32_t mnodeProcessMetaSyncCreateChildTableMsg(SMnodeMsg *pMsg, int32_t *tid, int32_t *vgId, uint64_t *uid, static int32_t mnodeProcessMetaSyncCreateChildTableMsg(SMnodeMsg *pMsg, SMetaInfo *pInf) {
uint64_t *suid) {
SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg)); SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg));
int32_t code = 0; int32_t code = 0;
// 0.db0._taos_meta_sync_cret_mndtb_taos_vgId.suid.uid.tid // 0.db0._taos_meta_sync_cret_mndtb_taos_vgId.suid.uid.tid.tbName
if (strstr(pCreate->tableName, META_SYNC_CRET_MNDTB)) { if (strstr(pCreate->tableName, META_SYNC_CRET_MNDTB)) {
code = TSDB_CODE_MND_INVALID_FORMAT;
char realName[TSDB_TABLE_FNAME_LEN] = {0};
char *pTbName = strchr(pCreate->tableName, '.'); char *pTbName = strchr(pCreate->tableName, '.');
if (pTbName && (pTbName = strchr(pTbName + 1, '.'))) { if (pTbName && (pTbName = strchr(pTbName + 1, '.'))) {
if (0 == strncmp(META_SYNC_CRET_MNDTB, ++pTbName, META_SYNC_TABLE_NAME_LEN)) { if (0 == strncmp(META_SYNC_CRET_MNDTB, ++pTbName, META_SYNC_TABLE_NAME_LEN)) {
strncpy(realName, pCreate->tableName, POINTER_DISTANCE(pTbName, pCreate->tableName));
pTbName += META_SYNC_TABLE_NAME_LEN; pTbName += META_SYNC_TABLE_NAME_LEN;
*vgId = atoi(pTbName); pInf->vgId = atoi(pTbName);
if ((pTbName = strchr(pTbName, '.'))) { if ((pTbName = strchr(pTbName, '.'))) {
*suid = strtoull(++pTbName, NULL, 10); pInf->suid = strtoull(++pTbName, NULL, 10);
if ((pTbName = strchr(pTbName, '.'))) { if ((pTbName = strchr(pTbName, '.'))) {
*uid = strtoull(++pTbName, NULL, 10); pInf->uid = strtoull(++pTbName, NULL, 10);
if ((pTbName = strchr(pTbName, '.'))) { if ((pTbName = strchr(pTbName, '.'))) {
*tid = atoi(++pTbName); pInf->tid = atoi(++pTbName);
if ((pTbName = strchr(pTbName, '.'))) {
int32_t len = strlen(realName);
strncpy(realName + len, pTbName + 1, TSDB_TABLE_FNAME_LEN - len - 1);
code = 0;
}
} }
} }
} }
} }
} }
if (*tid <= 0 || *uid == (uint64_t)-1 || *vgId <= 0) { if (code != 0 || pInf->tid < 1 || pInf->uid == (uint64_t)-1 || pInf->vgId < 2) {
code = TSDB_CODE_MND_INVALID_FORMAT; code = TSDB_CODE_MND_INVALID_FORMAT;
mError("msg:%p, app:%p table:%s, failed to create table, reason:%s", pMsg, pMsg->rpcMsg.ahandle, mError("%s:%d msg:%p, app:%p table:%s, failed to create table, reason:%s", __func__, __LINE__, pMsg,
pCreate->tableName, tstrerror(code)); pMsg->rpcMsg.ahandle, pCreate->tableName, tstrerror(code));
return code; return code;
} }
mInfo("msg:%p, app:%p table:%s, start to create table, vgId:%d suid:%" PRIu64 " uid:%" PRIu64 " tid:%d", pMsg, mInfo("%s:%d msg:%p, app:%p table:%s, start to create table, vgId:%d suid:%" PRIu64 " uid:%" PRIu64
pMsg->rpcMsg.ahandle, pCreate->tableName, *vgId, *suid, *uid, *tid); " tid:%d realName:%s",
__func__, __LINE__, pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName, pInf->vgId, pInf->suid, pInf->uid,
pInf->tid, realName);
strncpy(pCreate->tableName, realName, TSDB_TABLE_FNAME_LEN);
} else if (strstr(pCreate->tableName, META_SYNC_TABLE_NAME)) { } else if (strstr(pCreate->tableName, META_SYNC_TABLE_NAME)) {
char *pTbName = strchr(pCreate->tableName, '.'); char *pTbName = strchr(pCreate->tableName, '.');
if (pTbName && (pTbName = strchr(pTbName + 1, '.'))) { if (pTbName && (pTbName = strchr(pTbName + 1, '.'))) {
if (0 == strncmp(META_SYNC_TABLE_NAME, ++pTbName, META_SYNC_TABLE_NAME_LEN)) { if (0 == strncmp(META_SYNC_TABLE_NAME, ++pTbName, META_SYNC_TABLE_NAME_LEN)) {
*vgId = atoi(pTbName + META_SYNC_TABLE_NAME_LEN); pInf->vgId = atoi(pTbName + META_SYNC_TABLE_NAME_LEN);
} }
} }
if (*vgId <= 0) { if (pInf->vgId < 2) {
code = TSDB_CODE_MND_INVALID_FORMAT; code = TSDB_CODE_MND_INVALID_FORMAT;
mError("msg:%p, app:%p table:%s, failed to create table, reason:%s", pMsg, pMsg->rpcMsg.ahandle, mError("%s:%d msg:%p, app:%p table:%s, failed to create table, reason:%s", __func__, __LINE__, pMsg,
pCreate->tableName, tstrerror(code)); pMsg->rpcMsg.ahandle, pCreate->tableName, tstrerror(code));
return code; return code;
} }
mInfo("%s:%d msg:%p, app:%p table:%s, start to create table, vgId:%d", __func__, __LINE__, pMsg,
pMsg->rpcMsg.ahandle, pCreate->tableName, pInf->vgId);
} }
return code; return code;
...@@ -2335,17 +2362,14 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -2335,17 +2362,14 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
if (pMsg->retry == 0) { if (pMsg->retry == 0) {
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
int32_t tid = 0; SMetaInfo metaInfo = {.tid = 0, .vgId = 0, .uid = -1, .suid = -1};
int32_t vgId = 0;
uint64_t uid = -1;
uint64_t suid = -1;
if (tsMetaSyncOption) { if (tsMetaSyncOption) {
code = mnodeProcessMetaSyncCreateChildTableMsg(pMsg, &tid, &vgId, &uid, &suid); code = mnodeProcessMetaSyncCreateChildTableMsg(pMsg, &metaInfo);
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) return code;
} }
code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid, vgId); code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &metaInfo.tid, metaInfo.vgId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mInfo("msg:%p, app:%p table:%s, failed to get available vgroup, reason:%s", pMsg, pMsg->rpcMsg.ahandle, mInfo("msg:%p, app:%p table:%s, failed to get available vgroup, reason:%s", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName, tstrerror(code)); pCreate->tableName, tstrerror(code));
...@@ -2359,7 +2383,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -2359,7 +2383,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
pMsg->pVgroup = pVgroup; pMsg->pVgroup = pVgroup;
mnodeIncVgroupRef(pVgroup); mnodeIncVgroupRef(pVgroup);
return mnodeDoCreateChildTable(pMsg, tid); return mnodeDoCreateChildTable(pMsg, &metaInfo);
} }
} else { } else {
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreate->tableName); if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreate->tableName);
......
...@@ -459,9 +459,8 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi ...@@ -459,9 +459,8 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
continue; continue;
} }
int32_t sid = 0;
if (*pSid <= 0) { if (*pSid <= 0) {
sid = taosAllocateId(pVgroup->idPool); int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) { if (sid <= 0) {
int curMaxId = taosIdPoolMaxSize(pVgroup->idPool); int curMaxId = taosIdPoolMaxSize(pVgroup->idPool);
if ((taosUpdateIdPool(pVgroup->idPool, curMaxId + 1) < 0) || ((sid = taosAllocateId(pVgroup->idPool)) <= 0)) { if ((taosUpdateIdPool(pVgroup->idPool, curMaxId + 1) < 0) || ((sid = taosAllocateId(pVgroup->idPool)) <= 0)) {
...@@ -471,14 +470,19 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi ...@@ -471,14 +470,19 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
return TSDB_CODE_MND_APP_ERROR; return TSDB_CODE_MND_APP_ERROR;
} }
} }
*pSid = sid; // assignment
} else { } else {
int32_t code = taosAssignId(pVgroup->idPool, *pSid);
if (code != TSDB_CODE_SUCCESS) {
mError("msg:%p, app:%p db:%s, failed to assign tid:%d in vgId:%d since %s", pMsg, pMsg->rpcMsg.ahandle,
pDb->name, *pSid, pVgroup->vgId, tstrerror(code));
pthread_mutex_unlock(&pDb->mutex);
return code;
}
} }
mDebug("vgId:%d, alloc tid:%d", pVgroup->vgId, sid); mDebug("vgId:%d, alloc tid:%d", pVgroup->vgId, *pSid);
*pSid = sid;
*ppVgroup = pVgroup; *ppVgroup = pVgroup;
pDb->vgListIndex = v; pDb->vgListIndex = v;
......
...@@ -28,6 +28,8 @@ int taosIdPoolMaxSize(void *handle); ...@@ -28,6 +28,8 @@ int taosIdPoolMaxSize(void *handle);
int taosAllocateId(void *handle); int taosAllocateId(void *handle);
int taosAssignId(void *handle, int id);
void taosFreeId(void *handle, int id); void taosFreeId(void *handle, int id);
void taosIdPoolCleanUp(void *handle); void taosIdPoolCleanUp(void *handle);
......
...@@ -230,6 +230,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_PARTITONS, "Invalid topic partito ...@@ -230,6 +230,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_PARTITONS, "Invalid topic partito
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FORMAT, "Invalid format") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FORMAT, "Invalid format")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DUP_TID, "Duplicated tid")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ID_POOL_IS_FULL, "Id pool is full")
// dnode // dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, "Message not processed")
......
...@@ -14,8 +14,10 @@ ...@@ -14,8 +14,10 @@
*/ */
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "tulog.h" #include "tulog.h"
typedef struct { typedef struct {
int maxId; int maxId;
int numOfFree; int numOfFree;
...@@ -70,6 +72,34 @@ int taosAllocateId(void *handle) { ...@@ -70,6 +72,34 @@ int taosAllocateId(void *handle) {
return slot + 1; return slot + 1;
} }
int taosAssignId(void *handle, int id) {
id_pool_t *pIdPool = handle;
if (handle == NULL) {
return TSDB_CODE_MND_APP_ERROR;
}
int32_t code = 0;
pthread_mutex_lock(&pIdPool->mutex);
if (pIdPool->numOfFree > 0) {
if (id > 0 && id < pIdPool->maxId) {
if (false == pIdPool->freeList[id - 1]) {
pIdPool->freeList[id - 1] = true;
pIdPool->numOfFree--;
} else {
code = TSDB_CODE_MND_DUP_TID;
}
} else {
code = TSDB_CODE_MND_INVALID_FORMAT;
}
} else {
code = TSDB_CODE_MND_ID_POOL_IS_FULL;
}
pthread_mutex_unlock(&pIdPool->mutex);
return code;
}
void taosFreeId(void *handle, int id) { void taosFreeId(void *handle, int id) {
id_pool_t *pIdPool = handle; id_pool_t *pIdPool = handle;
if (handle == NULL) return; if (handle == NULL) return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册