提交 602f4509 编写于 作者: S Shengliang Guan

TD-10431 process stable meta msg

上级 ff645300
......@@ -785,12 +785,8 @@ typedef struct {
} SAuthVnodeMsg;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
} SStbInfoMsg;
typedef struct {
SMsgHead msgHead;
char tableFname[TSDB_TABLE_FNAME_LEN];
int32_t vgId;
char tableFname[TSDB_TABLE_FNAME_LEN];
} STableInfoMsg;
typedef struct {
......@@ -801,10 +797,6 @@ typedef struct {
char tableNames[];
} SMultiTableInfoMsg;
typedef struct SSTableVgroupMsg {
int32_t numOfTables;
} SSTableVgroupMsg, SSTableVgroupRspMsg;
typedef struct SVgroupInfo {
int32_t vgId;
uint32_t hashBegin;
......@@ -814,12 +806,6 @@ typedef struct SVgroupInfo {
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SVgroupInfo;
typedef struct SVgroupListRspMsg {
int32_t vgroupNum;
int32_t vgroupVersion;
SVgroupInfo vgroupInfo[];
} SVgroupListRspMsg;
typedef struct {
int32_t vgId;
int8_t numOfEps;
......@@ -841,8 +827,8 @@ typedef struct {
int8_t update;
int32_t sversion;
int32_t tversion;
uint64_t tuid;
uint64_t suid;
uint64_t tuid;
int32_t vgId;
SSchema pSchema[];
} STableMetaMsg;
......
......@@ -23,8 +23,6 @@ extern "C" {
#include "tarray.h"
#include "thash.h"
typedef SVgroupListRspMsg SVgroupListInfo;
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision
......
......@@ -38,7 +38,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_TABLE_META] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_TABLE_META] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_TABLES_META] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg;
......
......@@ -179,7 +179,7 @@ SServer* DndTestStb::pServer;
SClient* DndTestStb::pClient;
int32_t DndTestStb::connId;
TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) {
TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
{
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
strcpy(pReq->db, "1.d1");
......@@ -215,8 +215,8 @@ TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) {
}
{
int32_t tags = 2;
int32_t cols = 3;
int32_t cols = 2;
int32_t tags = 3;
int32_t size = (tags + cols) * sizeof(SSchema) + sizeof(SCreateStbMsg);
SCreateStbMsg* pReq = (SCreateStbMsg*)rpcMallocCont(size);
......@@ -266,7 +266,7 @@ TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) {
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateStbMsg);
rpcMsg.contLen = size;
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_STB;
sendMsg(pClient, &rpcMsg);
......@@ -274,65 +274,69 @@ TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) {
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
taosMsleep(10000);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show stables", 4, NULL);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time");
CheckSchema(2, TSDB_DATA_TYPE_INT, 2, "columns");
CheckSchema(3, TSDB_DATA_TYPE_INT, 2, "tags");
CheckSchema(2, TSDB_DATA_TYPE_INT, 4, "columns");
CheckSchema(3, TSDB_DATA_TYPE_INT, 4, "tags");
SendThenCheckShowRetrieveMsg(1);
CheckBinary("stb", TSDB_DB_NAME_LEN - 1);
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
CheckTimestamp();
CheckInt16(2);
CheckInt16(3);
CheckInt32(2);
CheckInt32(3);
#if 0
// ----- meta ------
{
SAlterDbMsg* pReq = (SAlterDbMsg*)rpcMallocCont(sizeof(SAlterDbMsg));
strcpy(pReq->db, "1.d1");
pReq->totalBlocks = htonl(12);
pReq->daysToKeep0 = htonl(300);
pReq->daysToKeep1 = htonl(400);
pReq->daysToKeep2 = htonl(500);
pReq->fsyncPeriod = htonl(4000);
pReq->walLevel = 2;
pReq->quorum = 2;
pReq->cacheLastRow = 1;
STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(sizeof(STableInfoMsg));
strcpy(pReq->tableFname, "1.d1.stb");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SAlterDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_DB;
rpcMsg.contLen = sizeof(STableInfoMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_TABLE_META;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(1);
CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
CheckTimestamp();
CheckInt16(2); // vgroups
CheckInt16(1); // replica
CheckInt16(2); // quorum
CheckInt16(10); // days
CheckBinary("300,400,500", 24); // days
CheckInt32(16); // cache
CheckInt32(12); // blocks
CheckInt32(100); // minrows
CheckInt32(4096); // maxrows
CheckInt8(2); // wallevel
CheckInt32(4000); // fsync
CheckInt8(2); // comp
CheckInt8(1); // cachelast
CheckBinary("ms", 3); // precision
CheckInt8(0); // update
#endif
STableMetaMsg* pRsp = (STableMetaMsg*)pMsg->pCont;
pRsp->numOfTags = htonl(pRsp->numOfTags);
pRsp->numOfColumns = htonl(pRsp->numOfColumns);
pRsp->sversion = htonl(pRsp->sversion);
pRsp->tversion = htonl(pRsp->tversion);
pRsp->suid = htobe64(pRsp->suid);
pRsp->tuid = htobe64(pRsp->tuid);
pRsp->vgId = htobe64(pRsp->vgId);
for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) {
SSchema* pSchema = &pRsp->pSchema[i];
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
}
EXPECT_STREQ(pRsp->tbFname, "");
EXPECT_STREQ(pRsp->stbFname, "1.d1.stb");
EXPECT_EQ(pRsp->numOfColumns, 2);
EXPECT_EQ(pRsp->numOfTags, 3);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRsp->tableType, TSDB_SUPER_TABLE);
EXPECT_EQ(pRsp->update, 0);
EXPECT_EQ(pRsp->sversion, 1);
EXPECT_EQ(pRsp->tversion, 0);
EXPECT_GT(pRsp->suid, 0);
EXPECT_EQ(pRsp->tuid, 0);
EXPECT_EQ(pRsp->vgId, 0);
{
SSchema* pSchema = &pRsp->pSchema[0];
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "ts");
}
}
// restart
stopServer(pServer);
......@@ -346,19 +350,13 @@ TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) {
uInfo("all server is running");
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show stables", 4, NULL);
CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time");
CheckSchema(2, TSDB_DATA_TYPE_INT, 2, "columns");
CheckSchema(3, TSDB_DATA_TYPE_INT, 2, "tags");
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
SendThenCheckShowRetrieveMsg(1);
CheckBinary("stb", TSDB_DB_NAME_LEN - 1);
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
CheckTimestamp();
CheckInt16(2);
CheckInt16(3);
CheckInt32(2);
CheckInt32(3);
#if 0
{
SDropStbMsg* pReq = (SDropStbMsg*)rpcMallocCont(sizeof(SDropStbMsg));
strcpy(pReq->name, "1.d1.stb");
......@@ -374,106 +372,6 @@ TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) {
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show stables", 4, NULL);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
SendThenCheckShowRetrieveMsg(0);
#endif
}
#if 0
TEST_F(DndTestStb, 03_Create_Use_Restart_Use_Db) {
{
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
strcpy(pReq->db, "1.d2");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRowsPerFileBlock = htonl(100);
pReq->maxRowsPerFileBlock = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replications = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->ignoreExist = 1;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(1);
CheckBinary("d2", TSDB_DB_NAME_LEN - 1);
{
SUseDbMsg* pReq = (SUseDbMsg*)rpcMallocCont(sizeof(SUseDbMsg));
strcpy(pReq->db, "1.d2");
pReq->vgVersion = htonl(-1);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SUseDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_USE_DB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
SUseDbRsp* pRsp = (SUseDbRsp*)pMsg->pCont;
EXPECT_STREQ(pRsp->db, "1.d2");
pRsp->vgVersion = htonl(pRsp->vgVersion);
pRsp->vgNum = htonl(pRsp->vgNum);
pRsp->hashMethod = pRsp->hashMethod;
EXPECT_EQ(pRsp->vgVersion, 1);
EXPECT_EQ(pRsp->vgNum, 2);
EXPECT_EQ(pRsp->hashMethod, 1);
{
SVgroupInfo* pInfo = &pRsp->vgroupInfo[0];
pInfo->vgId = htonl(pInfo->vgId);
pInfo->hashBegin = htonl(pInfo->hashBegin);
pInfo->hashEnd = htonl(pInfo->hashEnd);
EXPECT_GT(pInfo->vgId, 0);
EXPECT_EQ(pInfo->hashBegin, 0);
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1);
EXPECT_EQ(pInfo->inUse, 0);
EXPECT_EQ(pInfo->numOfEps, 1);
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9101);
EXPECT_STREQ(pAddr->fqdn, "localhost");
}
{
SVgroupInfo* pInfo = &pRsp->vgroupInfo[1];
pInfo->vgId = htonl(pInfo->vgId);
pInfo->hashBegin = htonl(pInfo->hashBegin);
pInfo->hashEnd = htonl(pInfo->hashEnd);
EXPECT_GT(pInfo->vgId, 0);
EXPECT_EQ(pInfo->hashBegin, UINT32_MAX / 2);
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX);
EXPECT_EQ(pInfo->inUse, 0);
EXPECT_EQ(pInfo->numOfEps, 1);
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9101);
EXPECT_STREQ(pAddr->fqdn, "localhost");
}
}
}
#endif
\ No newline at end of file
......@@ -80,7 +80,7 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT64(pRaw, dataPos, pStb->updateTime)
SDB_SET_INT64(pRaw, dataPos, pStb->uid)
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid)
SDB_SET_INT64(pRaw, dataPos, pStb->version)
SDB_SET_INT32(pRaw, dataPos, pStb->version)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags)
......@@ -487,31 +487,37 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SStbInfoMsg *pInfo = pMsg->rpcMsg.pCont;
SMnode *pMnode = pMsg->pMnode;
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to retrieve meta", pInfo->name);
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
SDbObj *pDb = mndAcquireDbByStb(pMnode, pInfo->name);
SDbObj *pDb = mndAcquireDbByStb(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to retrieve meta since %s", pInfo->name, terrstr());
mError("stb:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
return -1;
}
SStbObj *pStb = mndAcquireStb(pMnode, pInfo->name);
SStbObj *pStb = mndAcquireStb(pMnode, pInfo->tableFname);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_STB;
mError("stb:%s, failed to get meta since %s", pInfo->name, terrstr());
mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
int32_t contLen = sizeof(STableMetaMsg) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema);
taosRLockLatch(&pStb->lock);
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("stb:%s, failed to get meta since %s", pInfo->name, terrstr());
mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
......@@ -524,7 +530,7 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
pMeta->sversion = htonl(pStb->version);
pMeta->suid = htonl(pStb->uid);
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pStb->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
......@@ -532,11 +538,14 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
pMsg->pCont = pMeta;
pMsg->contLen = contLen;
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->name, pStb->numOfColumns, pStb->numOfTags);
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags);
return 0;
}
......@@ -553,7 +562,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
void *pIter = NULL;
while (1) {
SStbObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pStb);
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
if (pIter == NULL) break;
if (strcmp(pStb->db, dbName) == 0) {
......@@ -610,6 +619,7 @@ static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pM
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_STB);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
......@@ -653,8 +663,8 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
cols = 0;
char stbName[TSDB_TABLE_FNAME_LEN] = {0};
memcpy(stbName, pStb->name + prefixLen, TSDB_TABLE_FNAME_LEN - prefixLen);
char stbName[TSDB_TABLE_NAME_LEN] = {0};
tstrncpy(stbName, pStb->name + prefixLen, TSDB_TABLE_NAME_LEN);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, stbName);
cols++;
......@@ -664,11 +674,11 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pStb->numOfColumns;
*(int32_t *)pWrite = pStb->numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pStb->numOfTags;
*(int32_t *)pWrite = pStb->numOfTags;
cols++;
numOfRows++;
......
......@@ -39,7 +39,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
STableInfoMsg *bMsg = (STableInfoMsg *)*msg;
bMsg->msgHead.vgId = bInput->vgId;
bMsg->vgId = bInput->vgId;
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册