提交 17a3469a 编写于 作者: S Shengliang Guan

TD-10431 retrieve stable meta

上级 13cfafaf
......@@ -26,11 +26,11 @@
#define TSDB_STABLE_VER_NUM 1
#define TSDB_STABLE_RESERVE_SIZE 64
static SSdbRaw *mndStableActionEncode(SStableObj *pStable);
static SSdbRaw *mndStableActionEncode(SStableObj *pStb);
static SSdbRow *mndStableActionDecode(SSdbRaw *pRaw);
static int32_t mndStableActionInsert(SSdb *pSdb, SStableObj *pStable);
static int32_t mndStableActionDelete(SSdb *pSdb, SStableObj *pStable);
static int32_t mndStableActionUpdate(SSdb *pSdb, SStableObj *pOldStable, SStableObj *pNewStable);
static int32_t mndStableActionInsert(SSdb *pSdb, SStableObj *pStb);
static int32_t mndStableActionDelete(SSdb *pSdb, SStableObj *pStb);
static int32_t mndStableActionUpdate(SSdb *pSdb, SStableObj *pOldStb, SStableObj *pNewStb);
static int32_t mndProcessCreateStableMsg(SMnodeMsg *pMsg);
static int32_t mndProcessAlterStableMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropStableMsg(SMnodeMsg *pMsg);
......@@ -68,30 +68,30 @@ int32_t mndInitStable(SMnode *pMnode) {
void mndCleanupStable(SMnode *pMnode) {}
static SSdbRaw *mndStableActionEncode(SStableObj *pStable) {
int32_t size = sizeof(SStableObj) + (pStable->numOfColumns + pStable->numOfTags) * sizeof(SSchema);
static SSdbRaw *mndStableActionEncode(SStableObj *pStb) {
int32_t size = sizeof(SStableObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema);
SSdbRaw *pRaw = sdbAllocRaw(SDB_STABLE, TSDB_STABLE_VER_NUM, size);
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pStable->name, TSDB_TABLE_NAME_LEN)
SDB_SET_INT64(pRaw, dataPos, pStable->createdTime)
SDB_SET_INT64(pRaw, dataPos, pStable->updateTime)
SDB_SET_INT64(pRaw, dataPos, pStable->uid)
SDB_SET_INT64(pRaw, dataPos, pStable->version)
SDB_SET_INT32(pRaw, dataPos, pStable->numOfColumns)
SDB_SET_INT32(pRaw, dataPos, pStable->numOfTags)
for (int32_t i = 0; i < pStable->numOfColumns; ++i) {
SSchema *pSchema = &pStable->columnSchema[i];
SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_NAME_LEN)
SDB_SET_INT64(pRaw, dataPos, pStb->createdTime)
SDB_SET_INT64(pRaw, dataPos, pStb->updateTime)
SDB_SET_INT64(pRaw, dataPos, pStb->uid)
SDB_SET_INT64(pRaw, dataPos, pStb->version)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags)
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->columnSchema[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type);
SDB_SET_INT32(pRaw, dataPos, pSchema->colId);
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes);
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
}
for (int32_t i = 0; i < pStable->numOfTags; ++i) {
SSchema *pSchema = &pStable->tagSchema[i];
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->tagSchema[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type);
SDB_SET_INT32(pRaw, dataPos, pSchema->colId);
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes);
......@@ -116,31 +116,31 @@ static SSdbRow *mndStableActionDecode(SSdbRaw *pRaw) {
int32_t size = sizeof(SStableObj) + TSDB_MAX_COLUMNS * sizeof(SSchema);
SSdbRow *pRow = sdbAllocRow(size);
SStableObj *pStable = sdbGetRowObj(pRow);
if (pStable == NULL) return NULL;
SStableObj *pStb = sdbGetRowObj(pRow);
if (pStb == NULL) return NULL;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pStable->name, TSDB_TABLE_NAME_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStable->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStable->updateTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStable->uid)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStable->version)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStable->numOfColumns)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStable->numOfTags)
pStable->columnSchema = calloc(pStable->numOfColumns, sizeof(SSchema));
pStable->tagSchema = calloc(pStable->numOfTags, sizeof(SSchema));
for (int32_t i = 0; i < pStable->numOfColumns; ++i) {
SSchema *pSchema = &pStable->columnSchema[i];
SDB_GET_BINARY(pRaw, pRow, dataPos, pStb->name, TSDB_TABLE_NAME_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->updateTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->uid)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->version)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->numOfColumns)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->numOfTags)
pStb->columnSchema = calloc(pStb->numOfColumns, sizeof(SSchema));
pStb->tagSchema = calloc(pStb->numOfTags, sizeof(SSchema));
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->columnSchema[i];
SDB_GET_INT8(pRaw, pRow, dataPos, &pSchema->type);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->colId);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->bytes);
SDB_GET_BINARY(pRaw, pRow, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
}
for (int32_t i = 0; i < pStable->numOfTags; ++i) {
SSchema *pSchema = &pStable->tagSchema[i];
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->tagSchema[i];
SDB_GET_INT8(pRaw, pRow, dataPos, &pSchema->type);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->colId);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->bytes);
......@@ -152,37 +152,37 @@ static SSdbRow *mndStableActionDecode(SSdbRaw *pRaw) {
return pRow;
}
static int32_t mndStableActionInsert(SSdb *pSdb, SStableObj *pStable) {
mTrace("stable:%s, perform insert action", pStable->name);
static int32_t mndStableActionInsert(SSdb *pSdb, SStableObj *pStb) {
mTrace("stable:%s, perform insert action", pStb->name);
return 0;
}
static int32_t mndStableActionDelete(SSdb *pSdb, SStableObj *pStable) {
mTrace("stable:%s, perform delete action", pStable->name);
static int32_t mndStableActionDelete(SSdb *pSdb, SStableObj *pStb) {
mTrace("stable:%s, perform delete action", pStb->name);
return 0;
}
static int32_t mndStableActionUpdate(SSdb *pSdb, SStableObj *pOldStable, SStableObj *pNewStable) {
mTrace("stable:%s, perform update action", pOldStable->name);
atomic_exchange_32(&pOldStable->updateTime, pNewStable->updateTime);
atomic_exchange_32(&pOldStable->version, pNewStable->version);
static int32_t mndStableActionUpdate(SSdb *pSdb, SStableObj *pOldStb, SStableObj *pNewStb) {
mTrace("stable:%s, perform update action", pOldStb->name);
atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime);
atomic_exchange_32(&pOldStb->version, pNewStb->version);
taosWLockLatch(&pOldStable->lock);
int32_t numOfTags = pNewStable->numOfTags;
taosWLockLatch(&pOldStb->lock);
int32_t numOfTags = pNewStb->numOfTags;
int32_t tagSize = numOfTags * sizeof(SSchema);
int32_t numOfColumns = pNewStable->numOfColumns;
int32_t numOfColumns = pNewStb->numOfColumns;
int32_t columnSize = numOfColumns * sizeof(SSchema);
if (pOldStable->numOfTags < numOfTags) {
pOldStable->tagSchema = malloc(tagSize);
if (pOldStb->numOfTags < numOfTags) {
pOldStb->tagSchema = malloc(tagSize);
}
if (pOldStable->numOfColumns < numOfColumns) {
pOldStable->columnSchema = malloc(columnSize);
if (pOldStb->numOfColumns < numOfColumns) {
pOldStb->columnSchema = malloc(columnSize);
}
memcpy(pOldStable->tagSchema, pNewStable->tagSchema, tagSize);
memcpy(pOldStable->columnSchema, pNewStable->columnSchema, columnSize);
taosWUnLockLatch(&pOldStable->lock);
memcpy(pOldStb->tagSchema, pNewStb->tagSchema, tagSize);
memcpy(pOldStb->columnSchema, pNewStb->columnSchema, columnSize);
taosWUnLockLatch(&pOldStb->lock);
return 0;
}
......@@ -293,15 +293,15 @@ static int32_t mndGetNumOfStables(SMnode *pMnode, char *dbName, int32_t *pNumOfS
int32_t numOfStables = 0;
void *pIter = NULL;
while (1) {
SStableObj *pStable = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pStable);
SStableObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pStb);
if (pIter == NULL) break;
if (strcmp(pStable->db, dbName) == 0) {
if (strcmp(pStb->db, dbName) == 0) {
numOfStables++;
}
sdbRelease(pSdb, pStable);
sdbRelease(pSdb, pStb);
}
*pNumOfStables = numOfStables;
......@@ -374,7 +374,7 @@ static int32_t mndRetrieveStables(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStableObj *pStable = NULL;
SStableObj *pStb = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[64] = {0};
......@@ -384,36 +384,36 @@ static int32_t mndRetrieveStables(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
int32_t prefixLen = (int32_t)strlen(prefix);
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_STABLE, pShow->pIter, (void **)&pStable);
pShow->pIter = sdbFetch(pSdb, SDB_STABLE, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
if (strncmp(pStable->name, prefix, prefixLen) != 0) {
sdbRelease(pSdb, pStable);
if (strncmp(pStb->name, prefix, prefixLen) != 0) {
sdbRelease(pSdb, pStb);
continue;
}
cols = 0;
char stableName[TSDB_TABLE_FNAME_LEN] = {0};
memcpy(stableName, pStable->name + prefixLen, TSDB_TABLE_FNAME_LEN - prefixLen);
memcpy(stableName, pStb->name + prefixLen, TSDB_TABLE_FNAME_LEN - prefixLen);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, stableName);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pStable->createdTime;
*(int64_t *)pWrite = pStb->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pStable->numOfColumns;
*(int16_t *)pWrite = pStb->numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pStable->numOfTags;
*(int16_t *)pWrite = pStb->numOfTags;
cols++;
numOfRows++;
sdbRelease(pSdb, pStable);
sdbRelease(pSdb, pStb);
}
pShow->numOfReads += numOfRows;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册