提交 13cfafaf 编写于 作者: S Shengliang Guan

TD-10431 retrieve stable meta

上级 b6ed3056
......@@ -274,7 +274,7 @@ typedef struct {
typedef struct SSchema {
int8_t type;
int16_t colId;
int32_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
} SSchema;
......@@ -764,13 +764,17 @@ typedef struct {
} SAuthVnodeMsg;
typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN];
int16_t createFlag;
char tags[];
char name[TSDB_TABLE_FNAME_LEN];
} SStableInfoMsg;
typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN];
int8_t createFlag;
char tags[];
} STableInfoMsg;
typedef struct {
uint8_t metaClone; // create local clone of the cached table meta
int8_t metaClone; // create local clone of the cached table meta
int32_t numOfVgroups;
int32_t numOfTables;
int32_t numOfUdfs;
......@@ -788,25 +792,24 @@ typedef struct {
} SVgroupMsg;
typedef struct {
int32_t numOfVgroups;
int32_t numOfVgroups;
SVgroupMsg vgroups[];
} SVgroupsMsg, SVgroupsInfo;
typedef struct STableMetaMsg {
int32_t contLen;
typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN]; // table id
int8_t numOfTags;
char stableFname[TSDB_TABLE_FNAME_LEN];
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
int8_t tableType;
int16_t numOfColumns;
int16_t sversion;
int16_t tversion;
int32_t tid;
int64_t uid;
int8_t update;
int32_t sversion;
int32_t tversion;
uint64_t tuid;
uint64_t suid;
SVgroupMsg vgroup;
char sTableName[TSDB_TABLE_FNAME_LEN];
int64_t suid;
SSchema schema[];
SSchema pSchema[];
} STableMetaMsg;
typedef struct SMultiTableMeta {
......
......@@ -183,6 +183,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0348) //"Mnode already exists")
#define TSDB_CODE_MND_MNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0349) //"Mnode not there")
// mnode-table
#define TSDB_CODE_MND_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360) //"Table already exists")
#define TSDB_CODE_MND_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0361) //"Table name too long")
#define TSDB_CODE_MND_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0362) //"Table does not exist")
......
......@@ -93,21 +93,21 @@ TEST_F(DndTestCluster, ShowCluster) {
EXPECT_EQ(pMeta->suid, 0);
SSchema* pSchema = NULL;
pSchema = &pMeta->schema[0];
pSchema = &pMeta->pSchema[0];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "id");
pSchema = &pMeta->schema[1];
pSchema = &pMeta->pSchema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "name");
pSchema = &pMeta->schema[2];
pSchema = &pMeta->pSchema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
......
......@@ -152,49 +152,49 @@ TEST_F(DndTestProfile, SConnectMsg_03) {
EXPECT_EQ(pMeta->suid, 0);
SSchema* pSchema = NULL;
pSchema = &pMeta->schema[0];
pSchema = &pMeta->pSchema[0];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "connId");
pSchema = &pMeta->schema[1];
pSchema = &pMeta->pSchema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "user");
pSchema = &pMeta->schema[2];
pSchema = &pMeta->pSchema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "program");
pSchema = &pMeta->schema[3];
pSchema = &pMeta->pSchema[3];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "pid");
pSchema = &pMeta->schema[4];
pSchema = &pMeta->pSchema[4];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "ip:port");
pSchema = &pMeta->schema[5];
pSchema = &pMeta->pSchema[5];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "login_time");
pSchema = &pMeta->schema[6];
pSchema = &pMeta->pSchema[6];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
......@@ -493,28 +493,28 @@ TEST_F(DndTestProfile, SKillQueryMsg_03) {
EXPECT_EQ(pMeta->suid, 0);
SSchema* pSchema = NULL;
pSchema = &pMeta->schema[0];
pSchema = &pMeta->pSchema[0];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "queryId");
pSchema = &pMeta->schema[1];
pSchema = &pMeta->pSchema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "connId");
pSchema = &pMeta->schema[2];
pSchema = &pMeta->pSchema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "user");
pSchema = &pMeta->schema[3];
pSchema = &pMeta->pSchema[3];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
......@@ -680,21 +680,21 @@ TEST_F(DndTestProfile, SKillStreamMsg_03) {
EXPECT_EQ(pMeta->suid, 0);
SSchema* pSchema = NULL;
pSchema = &pMeta->schema[0];
pSchema = &pMeta->pSchema[0];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "streamId");
pSchema = &pMeta->schema[1];
pSchema = &pMeta->pSchema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "connId");
pSchema = &pMeta->schema[2];
pSchema = &pMeta->pSchema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
......
......@@ -154,49 +154,49 @@ TEST_F(DndTestShow, SShowMsg_04) {
EXPECT_EQ(pMeta->suid, 0);
SSchema* pSchema = NULL;
pSchema = &pMeta->schema[0];
pSchema = &pMeta->pSchema[0];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "connId");
pSchema = &pMeta->schema[1];
pSchema = &pMeta->pSchema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "user");
pSchema = &pMeta->schema[2];
pSchema = &pMeta->pSchema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "program");
pSchema = &pMeta->schema[3];
pSchema = &pMeta->pSchema[3];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "pid");
pSchema = &pMeta->schema[4];
pSchema = &pMeta->pSchema[4];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "ip:port");
pSchema = &pMeta->schema[5];
pSchema = &pMeta->pSchema[5];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "login_time");
pSchema = &pMeta->schema[6];
pSchema = &pMeta->pSchema[6];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
......
......@@ -93,28 +93,28 @@ TEST_F(DndTestUser, ShowUser) {
SSchema* pSchema = NULL;
pSchema = &pMeta->schema[0];
pSchema = &pMeta->pSchema[0];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "name");
pSchema = &pMeta->schema[1];
pSchema = &pMeta->pSchema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, 10 + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "privilege");
pSchema = &pMeta->schema[2];
pSchema = &pMeta->pSchema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "create_time");
pSchema = &pMeta->schema[3];
pSchema = &pMeta->pSchema[3];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
......
......@@ -47,7 +47,6 @@ typedef struct SAcctObj SAcctObj;
typedef struct SUserObj SUserObj;
typedef struct SDbObj SDbObj;
typedef struct SVgObj SVgObj;
typedef struct STableObj STableObj;
typedef struct SFuncObj SFuncObj;
typedef struct SOperObj SOperObj;
......@@ -249,10 +248,10 @@ typedef struct SStableObj {
int64_t updateTime;
uint64_t uid;
int32_t version;
int16_t numOfFields;
int16_t numOfTags;
int32_t numOfColumns;
int32_t numOfTags;
SRWLatch lock;
SSchema *fieldSchema;
SSchema *columnSchema;
SSchema *tagSchema;
} SStableObj;
......
......@@ -138,7 +138,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
......
......@@ -659,7 +659,7 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
......
......@@ -576,7 +576,7 @@ static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pMsg) { mInfo("cfg dnode rsp
static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
......@@ -654,7 +654,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
......
......@@ -51,7 +51,7 @@ int32_t mndInitFunc(SMnode *pMnode) {
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_FUNCTION, mndGetFuncMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNCTION, mndRetrieveFuncs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNCTION, mndCancelGetNextFunc);
return sdbSetTable(pMnode->pSdb, table);
}
......@@ -375,7 +375,7 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = TSDB_FUNC_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
......
......@@ -363,7 +363,7 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
......
......@@ -467,7 +467,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
mndReleaseUser(pMnode, pUser);
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
......@@ -595,7 +595,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
mndReleaseUser(pMnode, pUser);
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
......@@ -811,7 +811,7 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
mndReleaseUser(pMnode, pUser);
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
......
......@@ -21,6 +21,7 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndDb.h"
#include "tname.h"
#define TSDB_STABLE_VER_NUM 1
#define TSDB_STABLE_RESERVE_SIZE 64
......@@ -68,7 +69,7 @@ int32_t mndInitStable(SMnode *pMnode) {
void mndCleanupStable(SMnode *pMnode) {}
static SSdbRaw *mndStableActionEncode(SStableObj *pStable) {
int32_t size = sizeof(SStableObj) + (pStable->numOfFields + pStable->numOfTags) * sizeof(SSchema);
int32_t size = sizeof(SStableObj) + (pStable->numOfColumns + pStable->numOfTags) * sizeof(SSchema);
SSdbRaw *pRaw = sdbAllocRaw(SDB_STABLE, TSDB_STABLE_VER_NUM, size);
if (pRaw == NULL) return NULL;
......@@ -78,14 +79,14 @@ static SSdbRaw *mndStableActionEncode(SStableObj *pStable) {
SDB_SET_INT64(pRaw, dataPos, pStable->updateTime)
SDB_SET_INT64(pRaw, dataPos, pStable->uid)
SDB_SET_INT64(pRaw, dataPos, pStable->version)
SDB_SET_INT16(pRaw, dataPos, pStable->numOfFields)
SDB_SET_INT16(pRaw, dataPos, pStable->numOfTags)
SDB_SET_INT32(pRaw, dataPos, pStable->numOfColumns)
SDB_SET_INT32(pRaw, dataPos, pStable->numOfTags)
for (int32_t i = 0; i < pStable->numOfFields; ++i) {
SSchema *pSchema = &pStable->fieldSchema[i];
for (int32_t i = 0; i < pStable->numOfColumns; ++i) {
SSchema *pSchema = &pStable->columnSchema[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type);
SDB_SET_INT16(pRaw, dataPos, pSchema->colId);
SDB_SET_INT16(pRaw, dataPos, pSchema->bytes);
SDB_SET_INT32(pRaw, dataPos, pSchema->colId);
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes);
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
}
......@@ -124,16 +125,16 @@ static SSdbRow *mndStableActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, pRow, dataPos, &pStable->updateTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStable->uid)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStable->version)
SDB_GET_INT16(pRaw, pRow, dataPos, &pStable->numOfFields)
SDB_GET_INT16(pRaw, pRow, dataPos, &pStable->numOfTags)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStable->numOfColumns)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStable->numOfTags)
pStable->fieldSchema = calloc(pStable->numOfFields, sizeof(SSchema));
pStable->columnSchema = calloc(pStable->numOfColumns, sizeof(SSchema));
pStable->tagSchema = calloc(pStable->numOfTags, sizeof(SSchema));
for (int32_t i = 0; i < pStable->numOfFields; ++i) {
SSchema *pSchema = &pStable->fieldSchema[i];
for (int32_t i = 0; i < pStable->numOfColumns; ++i) {
SSchema *pSchema = &pStable->columnSchema[i];
SDB_GET_INT8(pRaw, pRow, dataPos, &pSchema->type);
SDB_GET_INT16(pRaw, pRow, dataPos, &pSchema->colId);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->colId);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->bytes);
SDB_GET_BINARY(pRaw, pRow, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
}
......@@ -141,7 +142,7 @@ static SSdbRow *mndStableActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < pStable->numOfTags; ++i) {
SSchema *pSchema = &pStable->tagSchema[i];
SDB_GET_INT8(pRaw, pRow, dataPos, &pSchema->type);
SDB_GET_INT16(pRaw, pRow, dataPos, &pSchema->colId);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->colId);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->bytes);
SDB_GET_BINARY(pRaw, pRow, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
}
......@@ -167,24 +168,34 @@ static int32_t mndStableActionUpdate(SSdb *pSdb, SStableObj *pOldStable, SStable
atomic_exchange_32(&pOldStable->version, pNewStable->version);
taosWLockLatch(&pOldStable->lock);
int16_t numOfTags = pNewStable->numOfTags;
int32_t numOfTags = pNewStable->numOfTags;
int32_t tagSize = numOfTags * sizeof(SSchema);
int16_t numOfFields = pNewStable->numOfFields;
int32_t fieldSize = numOfFields * sizeof(SSchema);
int32_t numOfColumns = pNewStable->numOfColumns;
int32_t columnSize = numOfColumns * sizeof(SSchema);
if (pOldStable->numOfTags < numOfTags) {
pOldStable->tagSchema = malloc(tagSize);
}
if (pOldStable->numOfFields < numOfFields) {
pOldStable->fieldSchema = malloc(fieldSize);
if (pOldStable->numOfColumns < numOfColumns) {
pOldStable->columnSchema = malloc(columnSize);
}
memcpy(pOldStable->tagSchema, pNewStable->tagSchema, tagSize);
memcpy(pOldStable->fieldSchema, pNewStable->fieldSchema, fieldSize);
memcpy(pOldStable->columnSchema, pNewStable->columnSchema, columnSize);
taosWUnLockLatch(&pOldStable->lock);
return 0;
}
SStableObj *mndAcquireStb(SMnode *pMnode, char *stbName) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_STABLE, stbName);
}
void mndReleaseStb(SMnode *pMnode, SStableObj *pStb) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pStb);
}
static int32_t mndProcessCreateStableMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCreateStableInRsp(SMnodeMsg *pMsg) { return 0; }
......@@ -197,7 +208,78 @@ static int32_t mndProcessDropStableMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropStableInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessStableMetaMsg(SMnodeMsg *pMsg) { return 0; }
static SDbObj *mndGetDbByStbName(SMnode *pMnode, char *stbName) {
SName name = {0};
tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char db[TSDB_TABLE_FNAME_LEN] = {0};
tNameGetFullDbName(&name, db);
return mndAcquireDb(pMnode, db);
}
static int32_t mndProcessStableMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SStableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
mDebug("stable:%s, start to retrieve meta", pInfo->name);
SDbObj *pDb = mndGetDbByStbName(pMnode, pInfo->name);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stable:%s, failed to retrieve meta since %s", pInfo->name, terrstr());
return -1;
}
SStableObj *pStb = mndAcquireStb(pMnode, pInfo->name);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_TABLE_NAME;
mError("stable:%s, failed to get meta since %s", pInfo->name, terrstr());
return -1;
}
int32_t contLen = sizeof(STableMetaMsg) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("stable:%s, failed to get meta since %s", pInfo->name, terrstr());
return -1;
}
memcpy(pMeta->stableFname, pStb->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pStb->numOfTags);
pMeta->numOfColumns = htonl(pStb->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pStb->version);
pMeta->suid = htonl(pStb->uid);
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pColumn = &pStb->columnSchema[i];
memcpy(pSchema->name, pColumn->name, TSDB_COL_NAME_LEN);
pSchema->type = pColumn->type;
pSchema->colId = htonl(pColumn->colId);
pSchema->bytes = htonl(pColumn->bytes);
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pMeta->pSchema[i + pStb->numOfColumns];
SSchema *pTag = &pStb->tagSchema[i];
memcpy(pSchema->name, pTag->name, TSDB_COL_NAME_LEN);
pSchema->type = pTag->type;
pSchema->colId = htons(pTag->colId);
pSchema->bytes = htonl(pTag->bytes);
}
pMsg->pCont = pMeta;
pMsg->contLen = contLen;
mDebug("stable:%s, meta is retrieved, cols:%d tags:%d", pInfo->name, pStb->numOfColumns, pStb->numOfTags);
return 0;
}
static int32_t mndGetNumOfStables(SMnode *pMnode, char *dbName, int32_t *pNumOfStables) {
SSdb *pSdb = pMnode->pSdb;
......@@ -235,7 +317,7 @@ static int32_t mndGetStableMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
}
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
......@@ -323,7 +405,7 @@ static int32_t mndRetrieveStables(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pStable->numOfFields;
*(int16_t *)pWrite = pStable->numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
......@@ -453,7 +453,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
......
......@@ -201,7 +201,7 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
}
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
......@@ -307,7 +307,7 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
......
......@@ -1480,7 +1480,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild) {
pTableMeta->tversion = pChild->tversion;
pTableMeta->sversion = pChild->sversion;
memcpy(pTableMeta->schema, pChild->schema, sizeof(SSchema) * total);
memcpy(pTableMeta->schema, pChild->pSchema, sizeof(SSchema) * total);
int32_t num = pTableMeta->tableInfo.numOfColumns;
for(int32_t i = 0; i < num; ++i) {
......
......@@ -2312,14 +2312,14 @@ static void yy_reduce(
break;
case 26: /* cmd ::= SHOW dbPrefix STABLES */
{
setShowOptions(pInfo, TSDB_MGMT_TABLE_METRIC, &yymsp[-1].minor.yy0, 0);
setShowOptions(pInfo, TSDB_MGMT_TABLE_STABLE, &yymsp[-1].minor.yy0, 0);
}
break;
case 27: /* cmd ::= SHOW dbPrefix STABLES LIKE ids */
{
SToken token;
tSetDbName(&token, &yymsp[-3].minor.yy0);
setShowOptions(pInfo, TSDB_MGMT_TABLE_METRIC, &token, &yymsp[0].minor.yy0);
setShowOptions(pInfo, TSDB_MGMT_TABLE_STABLE, &token, &yymsp[0].minor.yy0);
}
break;
case 28: /* cmd ::= SHOW dbPrefix VGROUPS */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册