未验证 提交 53cf171d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2574 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -471,7 +471,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -471,7 +471,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
// in case of insert, redo parsing the sql string and build new submit data block for two reasons: // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
// 2. vnode may need the schema information along with submit block to update its local table schema. // 2. vnode may need the schema information along with submit block to update its local table schema.
if (pCmd->command == TSDB_SQL_INSERT) { if (pCmd->command == TSDB_SQL_INSERT) {
tscDebug("%p redo parse sql string to build submit block", pSql); tscDebug("%p redo parse sql string to build submit block", pSql);
......
...@@ -4490,7 +4490,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4490,7 +4490,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pUpdateMsg->tid = htonl(pTableMeta->sid); pUpdateMsg->tid = htonl(pTableMeta->sid);
pUpdateMsg->uid = htobe64(pTableMeta->uid); pUpdateMsg->uid = htobe64(pTableMeta->uid);
pUpdateMsg->colId = htons(pTagsSchema->colId); pUpdateMsg->colId = htons(pTagsSchema->colId);
pUpdateMsg->type = htons(pTagsSchema->type); pUpdateMsg->type = pTagsSchema->type;
pUpdateMsg->bytes = htons(pTagsSchema->bytes); pUpdateMsg->bytes = htons(pTagsSchema->bytes);
pUpdateMsg->tversion = htons(pTableMeta->tversion); pUpdateMsg->tversion = htons(pTableMeta->tversion);
pUpdateMsg->numOfTags = htons(numOfTags); pUpdateMsg->numOfTags = htons(numOfTags);
......
...@@ -247,7 +247,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { ...@@ -247,7 +247,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
} else { } else {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || if (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
if (pCmd->command == TSDB_SQL_CONNECT) { if (pCmd->command == TSDB_SQL_CONNECT) {
rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
...@@ -260,7 +260,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { ...@@ -260,7 +260,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
// get table meta query will not retry, do nothing // get table meta query will not retry, do nothing
} else { } else {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.submitSchema = 1;
}
pSql->res.code = rpcMsg->code; // keep the previous error code pSql->res.code = rpcMsg->code; // keep the previous error code
if (pSql->retry > pSql->maxRetry) { if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
......
...@@ -579,9 +579,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo ...@@ -579,9 +579,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta); int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta);
for(int32_t j = 0; j < numOfCols; ++j) { for(int32_t j = 0; j < numOfCols; ++j) {
STColumn* pCol = (STColumn*) pDataBlock; STColumn* pCol = (STColumn*) pDataBlock;
pCol->colId = pSchema[j].colId; pCol->colId = htons(pSchema[j].colId);
pCol->type = pSchema[j].type; pCol->type = pSchema[j].type;
pCol->bytes = pSchema[j].bytes; pCol->bytes = htons(pSchema[j].bytes);
pCol->offset = 0; pCol->offset = 0;
pDataBlock += sizeof(STColumn); pDataBlock += sizeof(STColumn);
...@@ -663,7 +663,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { ...@@ -663,7 +663,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
} }
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize; int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
if (dataBuf->nAllocSize < destSize) { if (dataBuf->nAllocSize < destSize) {
while (dataBuf->nAllocSize < destSize) { while (dataBuf->nAllocSize < destSize) {
...@@ -691,7 +691,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { ...@@ -691,7 +691,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey)); pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize); int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
pBlocks->tid = htonl(pBlocks->tid); pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid); pBlocks->uid = htobe64(pBlocks->uid);
......
...@@ -200,6 +200,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb inval ...@@ -200,6 +200,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb inval
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "tsdb need to reconfigure table")
// query // query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle")
......
...@@ -203,8 +203,7 @@ typedef struct SSubmitBlk { ...@@ -203,8 +203,7 @@ typedef struct SSubmitBlk {
typedef struct SSubmitMsg { typedef struct SSubmitMsg {
SMsgHead header; SMsgHead header;
int32_t length; int32_t length;
int32_t compressed : 2; int32_t numOfBlocks;
int32_t numOfBlocks : 30;
SSubmitBlk blocks[]; SSubmitBlk blocks[];
} SSubmitMsg; } SSubmitMsg;
...@@ -285,7 +284,7 @@ typedef struct { ...@@ -285,7 +284,7 @@ typedef struct {
int32_t tid; int32_t tid;
int16_t tversion; int16_t tversion;
int16_t colId; int16_t colId;
int16_t type; int8_t type;
int16_t bytes; int16_t bytes;
int32_t tagValLen; int32_t tagValLen;
int16_t numOfTags; int16_t numOfTags;
......
...@@ -115,7 +115,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); ...@@ -115,7 +115,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
void tsdbStartStream(TSDB_REPO_T *repo); void tsdbStartStream(TSDB_REPO_T *repo);
......
...@@ -294,16 +294,62 @@ typedef struct { ...@@ -294,16 +294,62 @@ typedef struct {
#define TABLE_SUID(t) (t)->suid #define TABLE_SUID(t) (t)->suid
#define TABLE_LASTKEY(t) (t)->lastKey #define TABLE_LASTKEY(t) (t)->lastKey
static FORCE_INLINE STSchema *tsdbGetTableSchema(STable *pTable) { STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
if (pTable->type == TSDB_CHILD_TABLE) { // check child table first void tsdbFreeMeta(STsdbMeta* pMeta);
STable *pSuper = pTable->pSuper; int tsdbOpenMeta(STsdbRepo* pRepo);
if (pSuper == NULL) return NULL; int tsdbCloseMeta(STsdbRepo* pRepo);
return pSuper->schema[pSuper->numOfSchemas - 1]; STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
} else if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) { STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version);
return pTable->schema[pTable->numOfSchemas - 1]; int tsdbWLockRepoMeta(STsdbRepo* pRepo);
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
void tsdbRefTable(STable* pTable);
void tsdbUnRefTable(STable* pTable);
void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
return -1;
} else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) {
return 1;
} else { } else {
return NULL; return 0;
}
}
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t version) {
STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
STSchema* pSchema = NULL;
STSchema* pTSchema = NULL;
if (lock) taosRLockLatch(&(pDTable->latch));
if (version < 0) { // get the latest version of schema
pTSchema = pDTable->schema[pDTable->numOfSchemas - 1];
} else { // get the schema with version
void* ptr = taosbsearch(&version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*),
tsdbCompareSchemaVersion, TD_EQ);
if (ptr == NULL) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _exit;
}
pTSchema = *(STSchema**)ptr;
} }
ASSERT(pTSchema != NULL);
if (copy) {
if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
} else {
pSchema = pTSchema;
}
_exit:
if (lock) taosRUnLockLatch(&(pDTable->latch));
return pSchema;
}
static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) {
return tsdbGetTableSchemaImpl(pTable, false, false, -1);
} }
static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
...@@ -318,19 +364,6 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { ...@@ -318,19 +364,6 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
} }
} }
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
void tsdbFreeMeta(STsdbMeta* pMeta);
int tsdbOpenMeta(STsdbRepo* pRepo);
int tsdbCloseMeta(STsdbRepo* pRepo);
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version);
int tsdbUpdateTable(STsdbRepo* pRepo, STable* pTable, STableCfg* pCfg);
int tsdbWLockRepoMeta(STsdbRepo* pRepo);
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
void tsdbRefTable(STable* pTable);
void tsdbUnRefTable(STable* pTable);
// ------------------ tsdbBuffer.c // ------------------ tsdbBuffer.c
STsdbBufPool* tsdbNewBufPool(); STsdbBufPool* tsdbNewBufPool();
void tsdbFreeBufPool(STsdbBufPool* pBufPool); void tsdbFreeBufPool(STsdbBufPool* pBufPool);
......
...@@ -41,9 +41,9 @@ typedef struct { ...@@ -41,9 +41,9 @@ typedef struct {
} SSubmitBlkIter; } SSubmitBlkIter;
typedef struct { typedef struct {
int32_t totalLen; int32_t totalLen;
int32_t len; int32_t len;
SSubmitBlk *pBlock; void * pMsg;
} SSubmitMsgIter; } SSubmitMsgIter;
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
...@@ -56,7 +56,7 @@ static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg); ...@@ -56,7 +56,7 @@ static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg);
static void tsdbFreeRepo(STsdbRepo *pRepo); static void tsdbFreeRepo(STsdbRepo *pRepo);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows); static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows);
static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter); static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbRestoreInfo(STsdbRepo *pRepo); static int tsdbRestoreInfo(STsdbRepo *pRepo);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
...@@ -68,6 +68,7 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup); ...@@ -68,6 +68,7 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup);
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg); static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
// Function declaration // Function declaration
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
...@@ -164,6 +165,13 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * ...@@ -164,6 +165,13 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
STsdbRepo * pRepo = (STsdbRepo *)repo; STsdbRepo * pRepo = (STsdbRepo *)repo;
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
}
return -1;
}
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) { if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) {
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
...@@ -173,12 +181,14 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * ...@@ -173,12 +181,14 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
int32_t affectedrows = 0; int32_t affectedrows = 0;
TSKEY now = taosGetTimestamp(pRepo->config.precision); TSKEY now = taosGetTimestamp(pRepo->config.precision);
while (true) {
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { tsdbGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) { if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) {
return -1; return -1;
} }
} }
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows); if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
return 0; return 0;
} }
...@@ -263,7 +273,7 @@ void tsdbStartStream(TSDB_REPO_T *repo) { ...@@ -263,7 +273,7 @@ void tsdbStartStream(TSDB_REPO_T *repo) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) { if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
tsdbGetTableSchema(pTable)); tsdbGetTableSchemaImpl(pTable, false, false, -1));
} }
} }
} }
...@@ -694,17 +704,12 @@ static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { ...@@ -694,17 +704,12 @@ static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
return -1; return -1;
} }
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
pMsg->compressed = htonl(pMsg->compressed);
pIter->totalLen = pMsg->length; pIter->totalLen = pMsg->length;
pIter->len = TSDB_SUBMIT_MSG_HEAD_SIZE; pIter->len = 0;
pIter->pMsg = pMsg;
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) { if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1; return -1;
} else {
pIter->pBlock = pMsg->blocks;
} }
return 0; return 0;
...@@ -714,26 +719,8 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY ...@@ -714,26 +719,8 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
int64_t points = 0; int64_t points = 0;
STable *pTable = tsdbGetTableByUid(pMeta, pBlock->uid); STable *pTable = pMeta->tables[pBlock->tid];
if (pTable == NULL || TABLE_TID(pTable) != pBlock->tid) { ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
// Check schema version and update schema if needed
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
tsdbError("vgId:%d failed to insert data to table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
return -1;
}
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
SDataRow row = NULL; SDataRow row = NULL;
...@@ -764,27 +751,23 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY ...@@ -764,27 +751,23 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
return 0; return 0;
} }
static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
SSubmitBlk *pBlock = pIter->pBlock; if (pIter->len == 0) {
if (pBlock == NULL) return NULL; pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding);
pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->dataLen;
if (pIter->len >= pIter->totalLen) {
pIter->pBlock = NULL;
} else { } else {
pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->dataLen + sizeof(SSubmitBlk)); SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
} }
return pBlock; if (pIter->len > pIter->totalLen) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
*pPBlock = NULL;
return -1;
}
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
return 0;
} }
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
...@@ -969,42 +952,64 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { ...@@ -969,42 +952,64 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) { static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
ASSERT(pTable != NULL); ASSERT(pTable != NULL);
STSchema *pSchema = tsdbGetTableSchema(pTable); STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
int sversion = schemaVersion(pSchema); int sversion = schemaVersion(pSchema);
if (pBlock->sversion == sversion) return 0; if (pBlock->sversion == sversion) {
if (pBlock->sversion > sversion) { // need to config return 0;
tsdbDebug("vgId:%d table %s tid %d has version %d smaller than client version %d, try to config", REPO_ID(pRepo), } else {
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), sversion, pBlock->sversion); if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema
if (pRepo->appH.configFunc) { terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
void *msg = (*pRepo->appH.configFunc)(REPO_ID(pRepo), TABLE_TID(pTable)); return -1;
if (msg == NULL) { }
tsdbError("vgId:%d failed to config table %s tid %d since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), }
TABLE_TID(pTable), tstrerror(terrno));
if (pBlock->sversion > sversion) { // may need to update table schema
if (pBlock->schemaLen > 0) {
tsdbDebug(
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0);
int numOfCols = pBlock->schemaLen / sizeof(STColumn);
STColumn *pTCol = (STColumn *)pBlock->data;
STSchemaBuilder schemaBuilder = {0};
if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
return -1; return -1;
} }
STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg); for (int i = 0; i < numOfCols; i++) {
if (pTableCfg == NULL) { if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) {
rpcFreeCont(msg); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
tdDestroyTSchemaBuilder(&schemaBuilder);
return -1;
}
} }
if (tsdbUpdateTable(pRepo, (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable, pTableCfg) < 0) { STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder);
tsdbError("vgId:%d failed to update table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), if (pNSchema == NULL) {
tstrerror(terrno)); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbClearTableCfg(pTableCfg); tdDestroyTSchemaBuilder(&schemaBuilder);
rpcFreeCont(msg);
return -1; return -1;
} }
tsdbClearTableCfg(pTableCfg);
rpcFreeCont(msg); tdDestroyTSchemaBuilder(&schemaBuilder);
tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true);
} else { } else {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; tsdbDebug(
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE;
return -1; return -1;
} }
} else { } else {
if (tsdbGetTableSchemaByVersion(pTable, pBlock->sversion) == NULL) { ASSERT(pBlock->sversion >= 0);
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) {
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo), tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable)); pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
} }
...@@ -1013,7 +1018,64 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT ...@@ -1013,7 +1018,64 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT
} }
return 0; return 0;
} }
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
ASSERT(pMsg != NULL);
STsdbMeta * pMeta = pRepo->tsdbMeta;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
if (pBlock->tid <= 0 || pBlock->tid >= pRepo->config.maxTables) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
STable *pTable = pMeta->tables[pBlock->tid];
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
// Check schema version and update schema if needed
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
continue;
} else {
return -1;
}
}
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
// TODO // TODO
......
...@@ -538,10 +538,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ...@@ -538,10 +538,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
SCommitIter *pIter = iters + tid; SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue; if (pIter->pTable == NULL) continue;
taosRLockLatch(&(pIter->pTable->latch));
tsdbSetHelperTable(pHelper, pIter->pTable, pRepo); tsdbSetHelperTable(pHelper, pIter->pTable, pRepo);
if (pIter->pIter != NULL) { if (pIter->pIter != NULL) {
tdInitDataCols(pDataCols, tsdbGetTableSchema(pIter->pTable)); tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1));
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
int nLoop = 0; int nLoop = 0;
...@@ -557,6 +559,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ...@@ -557,6 +559,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
ASSERT(rowsWritten != 0); ASSERT(rowsWritten != 0);
if (rowsWritten < 0) { if (rowsWritten < 0) {
taosRUnLockLatch(&(pIter->pTable->latch));
tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
tstrerror(terrno)); tstrerror(terrno));
...@@ -571,6 +574,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ...@@ -571,6 +574,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
ASSERT(pDataCols->numOfRows == 0); ASSERT(pDataCols->numOfRows == 0);
} }
taosRUnLockLatch(&(pIter->pTable->latch));
// Move the last block to the new .l file if neccessary // Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
...@@ -680,10 +685,10 @@ static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIter ...@@ -680,10 +685,10 @@ static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIter
if (dataRowKey(row) > maxKey) break; if (dataRowKey(row) > maxKey) break;
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); pSchema = tsdbGetTableSchemaImpl(pTable, true, false, dataRowVersion(row));
if (pSchema == NULL) { if (pSchema == NULL) {
// TODO: deal with the error here // TODO: deal with the error here
ASSERT(false); ASSERT(0);
} }
} }
......
...@@ -29,10 +29,9 @@ static void tsdbOrgMeta(void *pHandle); ...@@ -29,10 +29,9 @@ static void tsdbOrgMeta(void *pHandle);
static char * getTagIndexKey(const void *pData); static char * getTagIndexKey(const void *pData);
static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper); static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper);
static void tsdbFreeTable(STable *pTable); static void tsdbFreeTable(STable *pTable);
static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema); static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, bool lock);
static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx);
static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFromIdx, bool lock); static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFromIdx, bool lock);
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper);
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid); static int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid);
static int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup); static int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
...@@ -76,7 +75,7 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { ...@@ -76,7 +75,7 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
// TODO // TODO
if (super->type != TSDB_SUPER_TABLE) return -1; if (super->type != TSDB_SUPER_TABLE) return -1;
if (super->tableId.uid != pCfg->superUid) return -1; if (super->tableId.uid != pCfg->superUid) return -1;
tsdbUpdateTable(pRepo, super, pCfg); // tsdbUpdateTable(pRepo, super, pCfg);
} }
} }
...@@ -84,10 +83,18 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { ...@@ -84,10 +83,18 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
if (table == NULL) goto _err; if (table == NULL) goto _err;
// Register to meta // Register to meta
tsdbWLockRepoMeta(pRepo);
if (newSuper) { if (newSuper) {
if (tsdbAddTableToMeta(pRepo, super, true) < 0) goto _err; if (tsdbAddTableToMeta(pRepo, super, true, false) < 0) {
tsdbUnlockRepoMeta(pRepo);
goto _err;
}
}
if (tsdbAddTableToMeta(pRepo, table, true, false) < 0) {
tsdbUnlockRepoMeta(pRepo);
goto _err;
} }
if (tsdbAddTableToMeta(pRepo, table, true) < 0) goto _err; tsdbUnlockRepoMeta(pRepo);
// Write to memtable action // Write to memtable action
int tlen1 = (newSuper) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, super) : 0; int tlen1 = (newSuper) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, super) : 0;
...@@ -255,7 +262,7 @@ _err: ...@@ -255,7 +262,7 @@ _err:
return NULL; return NULL;
} }
static int32_t colIdCompar(const void* left, const void* right) { static UNUSED_FUNC int32_t colIdCompar(const void* left, const void* right) {
int16_t colId = *(int16_t*) left; int16_t colId = *(int16_t*) left;
STColumn* p2 = (STColumn*) right; STColumn* p2 = (STColumn*) right;
...@@ -266,91 +273,118 @@ static int32_t colIdCompar(const void* left, const void* right) { ...@@ -266,91 +273,118 @@ static int32_t colIdCompar(const void* left, const void* right) {
return (colId < p2->colId)? -1:1; return (colId < p2->colId)? -1:1;
} }
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
STSchema * pNewSchema = NULL;
pMsg->uid = htobe64(pMsg->uid); pMsg->uid = htobe64(pMsg->uid);
pMsg->tid = htonl(pMsg->tid); pMsg->tid = htonl(pMsg->tid);
pMsg->tversion = htons(pMsg->tversion); pMsg->tversion = htons(pMsg->tversion);
pMsg->colId = htons(pMsg->colId); pMsg->colId = htons(pMsg->colId);
pMsg->type = htons(pMsg->type);
pMsg->bytes = htons(pMsg->bytes); pMsg->bytes = htons(pMsg->bytes);
pMsg->tagValLen = htonl(pMsg->tagValLen); pMsg->tagValLen = htonl(pMsg->tagValLen);
pMsg->numOfTags = htons(pMsg->numOfTags); pMsg->numOfTags = htons(pMsg->numOfTags);
pMsg->schemaLen = htonl(pMsg->schemaLen); pMsg->schemaLen = htonl(pMsg->schemaLen);
assert(pMsg->schemaLen == sizeof(STColumn) * pMsg->numOfTags); for (int i = 0; i < pMsg->numOfTags; i++) {
STColumn *pTCol = (STColumn *)pMsg->data + i;
char* d = pMsg->data; pTCol->bytes = htons(pTCol->bytes);
for(int32_t i = 0; i < pMsg->numOfTags; ++i) { pTCol->colId = htons(pTCol->colId);
STColumn* pCol = (STColumn*) d;
pCol->colId = htons(pCol->colId);
pCol->bytes = htons(pCol->bytes);
pCol->offset = 0;
d += sizeof(STColumn);
} }
STable *pTable = tsdbGetTableByUid(pMeta, pMsg->uid); STable *pTable = tsdbGetTableByUid(pMeta, pMsg->uid);
if (pTable == NULL) { if (pTable == NULL || TABLE_TID(pTable) != pMsg->tid) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; tsdbError("vgId:%d failed to update table tag value since invalid table id %d uid %" PRIu64, REPO_ID(pRepo),
return -1; pMsg->tid, pMsg->uid);
}
if (TABLE_TID(pTable) != pMsg->tid) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1; return -1;
} }
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
tsdbError("vgId:%d failed to update tag value of table %s since its type is %d", REPO_ID(pRepo), tsdbError("vgId:%d try to update tag value of a non-child table, invalid action", REPO_ID(pRepo));
TABLE_CHAR_NAME(pTable), TABLE_TYPE(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION; terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1; return -1;
} }
if (schemaVersion(tsdbGetTableTagSchema(pTable)) < pMsg->tversion) { if (schemaVersion(pTable->pSuper->tagSchema) > pMsg->tversion) {
tsdbDebug("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo), tsdbError(
schemaVersion(tsdbGetTableTagSchema(pTable)), pMsg->tversion); "vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag "
void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, pMsg->tid); "version %d",
if (msg == NULL) return -1; REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema));
terrno = TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
return -1;
}
// Deal with error her if (schemaVersion(pTable->pSuper->tagSchema) < pMsg->tversion) { // tag schema out of data,
STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg); tsdbDebug("vgId:%d need to update tag schema of table %s tid %d uid %" PRIu64
STable * super = tsdbGetTableByUid(pMeta, pTableCfg->superUid); " since out of date, current version %d new version %d",
ASSERT(super != NULL); REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
schemaVersion(pTable->pSuper->tagSchema), pMsg->tversion);
int32_t code = tsdbUpdateTable(pRepo, super, pTableCfg); STSchemaBuilder schemaBuilder = {0};
if (code != TSDB_CODE_SUCCESS) {
tsdbClearTableCfg(pTableCfg); STColumn *pTCol = (STColumn *)pMsg->data;
return code; ASSERT(pMsg->schemaLen % sizeof(STColumn) == 0 && pTCol[0].colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)));
if (tdInitTSchemaBuilder(&schemaBuilder, pMsg->tversion) < 0) {
tsdbDebug("vgId:%d failed to update tag schema of table %s tid %d uid %" PRIu64 " since out of memory",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable));
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
} }
tsdbClearTableCfg(pTableCfg); for (int i = 0; i < (pMsg->schemaLen / sizeof(STColumn)); i++) {
rpcFreeCont(msg); if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, pTCol[i].colId, pTCol[i].bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
pNewSchema = tdGetSchemaFromBuilder(&schemaBuilder);
if (pNewSchema == NULL) {
tdDestroyTSchemaBuilder(&schemaBuilder);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tdDestroyTSchemaBuilder(&schemaBuilder);
} }
STSchema *pTagSchema = tsdbGetTableTagSchema(pTable); // Chage in memory
if (pNewSchema != NULL) { // change super table tag schema
if (schemaVersion(pTagSchema) > pMsg->tversion) { taosWLockLatch(&(pTable->pSuper->latch));
tsdbError( STSchema *pOldSchema = pTable->pSuper->tagSchema;
"vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag " pTable->pSuper->tagSchema = pNewSchema;
"version %d", tdFreeSchema(pOldSchema);
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema)); taosWUnLockLatch(&(pTable->pSuper->latch));
return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
} }
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) {
bool isChangeIndexCol = (pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)));
// STColumn *pCol = bsearch(&(pMsg->colId), pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar);
// ASSERT(pCol != NULL);
if (isChangeIndexCol) {
tsdbWLockRepoMeta(pRepo);
tsdbRemoveTableFromIndex(pMeta, pTable); tsdbRemoveTableFromIndex(pMeta, pTable);
} }
// TODO: remove table from index if it is the first column of tag taosWLockLatch(&(pTable->latch));
tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pMsg->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen));
// TODO: convert the tag schema from client, and then extract the type and bytes from schema according to colId taosWUnLockLatch(&(pTable->latch));
STColumn* res = bsearch(&pMsg->colId, pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar); if (isChangeIndexCol) {
assert(res != NULL); tsdbAddTableIntoIndex(pMeta, pTable, false);
tsdbUnlockRepoMeta(pRepo);
}
tdSetKVRowDataOfCol(&pTable->tagVal, pMsg->colId, res->type, pMsg->data + pMsg->schemaLen); // Update on file
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) { int tlen1 = (pNewSchema) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable->pSuper) : 0;
tsdbAddTableIntoIndex(pMeta, pTable); int tlen2 = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable);
void *buf = tsdbAllocBytes(pRepo, tlen1+tlen2);
ASSERT(buf != NULL);
if (pNewSchema) {
void *pBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable->pSuper);
ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen1);
buf = pBuf;
} }
return TSDB_CODE_SUCCESS; tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
return 0;
} }
// ------------------ INTERNAL FUNCTIONS ------------------ // ------------------ INTERNAL FUNCTIONS ------------------
...@@ -460,56 +494,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) { ...@@ -460,56 +494,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
} }
STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t version) { STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t version) {
STable *pSearchTable = (pTable->type == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; return tsdbGetTableSchemaImpl(pTable, true, false, version);
if (pSearchTable == NULL) return NULL;
void *ptr = taosbsearch(&version, pSearchTable->schema, pSearchTable->numOfSchemas, sizeof(STSchema *),
tsdbCompareSchemaVersion, TD_EQ);
if (ptr == NULL) return NULL;
return *(STSchema **)ptr;
}
int tsdbUpdateTable(STsdbRepo *pRepo, STable *pTable, STableCfg *pCfg) {
// TODO: this function can only be called when there is no query and commit on this table
ASSERT(TABLE_TYPE(pTable) != TSDB_CHILD_TABLE);
bool changed = false;
STsdbMeta *pMeta = pRepo->tsdbMeta;
if ((pTable->type == TSDB_SUPER_TABLE) && (schemaVersion(pTable->tagSchema) < schemaVersion(pCfg->tagSchema))) {
if (tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema) < 0) {
tsdbError("vgId:%d failed to update table %s tag schema since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
return -1;
}
changed = true;
}
STSchema *pTSchema = tsdbGetTableSchema(pTable);
if (schemaVersion(pTSchema) < schemaVersion(pCfg->schema)) {
if (pTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
pTable->schema[pTable->numOfSchemas++] = tdDupSchema(pCfg->schema);
} else {
ASSERT(pTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS);
STSchema *tSchema = tdDupSchema(pCfg->schema);
tdFreeSchema(pTable->schema[0]);
memmove(pTable->schema, pTable->schema + 1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1));
pTable->schema[pTable->numOfSchemas - 1] = tSchema;
}
pMeta->maxRowBytes = MAX(pMeta->maxRowBytes, dataRowMaxBytesFromSchema(pCfg->schema));
pMeta->maxCols = MAX(pMeta->maxCols, schemaNCols(pCfg->schema));
changed = true;
}
if (changed) {
int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable);
void *buf = tsdbAllocBytes(pRepo, tlen);
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
}
return 0;
} }
int tsdbWLockRepoMeta(STsdbRepo *pRepo) { int tsdbWLockRepoMeta(STsdbRepo *pRepo) {
...@@ -553,7 +538,7 @@ void tsdbRefTable(STable *pTable) { ...@@ -553,7 +538,7 @@ void tsdbRefTable(STable *pTable) {
void tsdbUnRefTable(STable *pTable) { void tsdbUnRefTable(STable *pTable) {
int32_t ref = T_REF_DEC(pTable); int32_t ref = T_REF_DEC(pTable);
tsdbTrace("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); tsdbDebug("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
if (ref == 0) { if (ref == 0) {
// tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable)); // tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable));
...@@ -565,17 +550,36 @@ void tsdbUnRefTable(STable *pTable) { ...@@ -565,17 +550,36 @@ void tsdbUnRefTable(STable *pTable) {
} }
} }
// ------------------ LOCAL FUNCTIONS ------------------ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, bool insertAct) {
static int tsdbCompareSchemaVersion(const void *key1, const void *key2) { ASSERT(TABLE_TYPE(pTable) != TSDB_STREAM_TABLE && TABLE_TYPE(pTable) != TSDB_SUPER_TABLE);
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { STsdbMeta *pMeta = pRepo->tsdbMeta;
return -1;
} else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) { STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
return 1; ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1]));
taosWLockLatch(&(pCTable->latch));
if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
pCTable->schema[pCTable->numOfSchemas++] = pSchema;
} else { } else {
return 0; ASSERT(pCTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS);
tdFreeSchema(pCTable->schema[0]);
memmove(pCTable->schema, pCTable->schema + 1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1));
pCTable->schema[pCTable->numOfSchemas - 1] = pSchema;
}
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
taosWUnLockLatch(&(pCTable->latch));
if (insertAct) {
int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pCTable);
void *buf = tsdbAllocBytes(pRepo, tlen);
ASSERT(buf != NULL);
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
} }
} }
// ------------------ LOCAL FUNCTIONS ------------------
static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
STsdbRepo *pRepo = (STsdbRepo *)pHandle; STsdbRepo *pRepo = (STsdbRepo *)pHandle;
STable * pTable = NULL; STable * pTable = NULL;
...@@ -587,7 +591,7 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { ...@@ -587,7 +591,7 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
tsdbDecodeTable(cont, &pTable); tsdbDecodeTable(cont, &pTable);
if (tsdbAddTableToMeta(pRepo, pTable, false) < 0) { if (tsdbAddTableToMeta(pRepo, pTable, false, false) < 0) {
tsdbFreeTable(pTable); tsdbFreeTable(pTable);
return -1; return -1;
} }
...@@ -605,7 +609,7 @@ static void tsdbOrgMeta(void *pHandle) { ...@@ -605,7 +609,7 @@ static void tsdbOrgMeta(void *pHandle) {
for (int i = 1; i < pCfg->maxTables; i++) { for (int i = 1; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) { if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) {
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable, true);
} }
} }
} }
...@@ -715,7 +719,7 @@ _err: ...@@ -715,7 +719,7 @@ _err:
static void tsdbFreeTable(STable *pTable) { static void tsdbFreeTable(STable *pTable) {
if (pTable) { if (pTable) {
tsdbDebug("table %s is destroyed", TABLE_CHAR_NAME(pTable)); if (pTable->name != NULL) tsdbDebug("table %s is destroyed", TABLE_CHAR_NAME(pTable));
tfree(TABLE_NAME(pTable)); tfree(TABLE_NAME(pTable));
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) { for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) {
...@@ -735,25 +739,10 @@ static void tsdbFreeTable(STable *pTable) { ...@@ -735,25 +739,10 @@ static void tsdbFreeTable(STable *pTable) {
} }
} }
static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) { static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, bool lock) {
ASSERT(pTable->type == TSDB_SUPER_TABLE);
ASSERT(schemaVersion(pTable->tagSchema) < schemaVersion(newSchema));
STSchema *pOldSchema = pTable->tagSchema;
STSchema *pNewSchema = tdDupSchema(newSchema);
if (pNewSchema == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pTable->tagSchema = pNewSchema;
tdFreeSchema(pOldSchema);
return 0;
}
static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
if (addIdx && tsdbWLockRepoMeta(pRepo) < 0) { if (lock && tsdbWLockRepoMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to add table %s to meta since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tsdbError("vgId:%d failed to add table %s to meta since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno)); tstrerror(terrno));
return -1; return -1;
...@@ -768,7 +757,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) { ...@@ -768,7 +757,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
} }
} else { } else {
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index
if (tsdbAddTableIntoIndex(pMeta, pTable) < 0) { if (tsdbAddTableIntoIndex(pMeta, pTable, true) < 0) {
tsdbDebug("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo), tsdbDebug("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), tstrerror(terrno)); TABLE_CHAR_NAME(pTable), tstrerror(terrno));
goto _err; goto _err;
...@@ -787,14 +776,15 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) { ...@@ -787,14 +776,15 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
} }
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
STSchema *pSchema = tsdbGetTableSchema(pTable); STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema); if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema); if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
} }
if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1; if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) { if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, tsdbGetTableSchema(pTable)); pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
} }
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
...@@ -803,7 +793,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) { ...@@ -803,7 +793,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
_err: _err:
tsdbRemoveTableFromMeta(pRepo, pTable, false, false); tsdbRemoveTableFromMeta(pRepo, pTable, false, false);
if (addIdx) tsdbUnlockRepoMeta(pRepo); if (lock) tsdbUnlockRepoMeta(pRepo);
return -1; return -1;
} }
...@@ -814,7 +804,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro ...@@ -814,7 +804,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
STable * tTable = NULL; STable * tTable = NULL;
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
STSchema *pSchema = tsdbGetTableSchema(pTable); STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
int maxCols = schemaNCols(pSchema); int maxCols = schemaNCols(pSchema);
int maxRowBytes = schemaTLen(pSchema); int maxRowBytes = schemaTLen(pSchema);
...@@ -848,7 +838,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro ...@@ -848,7 +838,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
for (int i = 0; i < pCfg->maxTables; i++) { for (int i = 0; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable != NULL) { if (pTable != NULL) {
pSchema = tsdbGetTableSchema(pTable); pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
maxCols = MAX(maxCols, schemaNCols(pSchema)); maxCols = MAX(maxCols, schemaNCols(pSchema));
maxRowBytes = MAX(maxRowBytes, schemaTLen(pSchema)); maxRowBytes = MAX(maxRowBytes, schemaTLen(pSchema));
} }
...@@ -860,7 +850,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro ...@@ -860,7 +850,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
tsdbUnRefTable(pTable); tsdbUnRefTable(pTable);
} }
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper) {
ASSERT(pTable->type == TSDB_CHILD_TABLE && pTable != NULL); ASSERT(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
STable *pSTable = tsdbGetTableByUid(pMeta, TABLE_SUID(pTable)); STable *pSTable = tsdbGetTableByUid(pMeta, TABLE_SUID(pTable));
ASSERT(pSTable != NULL); ASSERT(pSTable != NULL);
...@@ -884,7 +874,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { ...@@ -884,7 +874,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
memcpy(SL_GET_NODE_DATA(pNode), &pTable, sizeof(STable *)); memcpy(SL_GET_NODE_DATA(pNode), &pTable, sizeof(STable *));
tSkipListPut(pSTable->pIndex, pNode); tSkipListPut(pSTable->pIndex, pNode);
T_REF_INC(pSTable); if (refSuper) T_REF_INC(pSTable);
return 0; return 0;
} }
...@@ -1252,4 +1242,4 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) { ...@@ -1252,4 +1242,4 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
} }
return 0; return 0;
} }
\ No newline at end of file
...@@ -218,7 +218,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { ...@@ -218,7 +218,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
pHelper->tableInfo.tid = pTable->tableId.tid; pHelper->tableInfo.tid = pTable->tableId.tid;
pHelper->tableInfo.uid = pTable->tableId.uid; pHelper->tableInfo.uid = pTable->tableId.uid;
STSchema *pSchema = tsdbGetTableSchema(pTable); STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
pHelper->tableInfo.sversion = schemaVersion(pSchema); pHelper->tableInfo.sversion = schemaVersion(pSchema);
tdInitDataCols(pHelper->pDataCols[0], pSchema); tdInitDataCols(pHelper->pDataCols[0], pSchema);
......
...@@ -162,7 +162,7 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet ...@@ -162,7 +162,7 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
} }
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
if (tsdbUpdateTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont) < 0) { if (tsdbUpdateTableTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont) < 0) {
return terrno; return terrno;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册