提交 a60fc0f7 编写于 作者: H Hongze Cheng

TD-353

上级 65552dc4
......@@ -298,7 +298,7 @@ STSchema* tsdbGetTableSchema(STable* pTable);
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version);
STSchema* tsdbGetTableTagSchema(STable* pTable);
int tsdbUpdateTable(STsdbMeta* pMeta, STable* pTable, STableCfg* pCfg);
int tsdbUpdateTable(STsdbRepo* pRepo, STable* pTable, STableCfg* pCfg);
int tsdbWLockRepoMeta(STsdbRepo* pRepo);
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
......
......@@ -67,7 +67,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
ASSERT(pPool != NULL);
pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024;
pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB
pPool->tBufBlocks = pCfg->totalBlocks;
pPool->nBufBlocks = 0;
pPool->index = 0;
......@@ -106,6 +106,7 @@ void tsdbCloseBufPool(STsdbRepo *pRepo) {
while ((pNode = tdListPopHead(pBufPool->bufBlockList)) != NULL) {
tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock));
tsdbFreeBufBlock(pBufBlock);
free(pNode);
}
}
......
......@@ -67,6 +67,7 @@ static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
static int keyFGroupCompFunc(const void *key, const void *fgroup);
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
// Function declaration
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
......@@ -81,7 +82,7 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
if (tsdbSetRepoEnv(rootDir, pCfg) < 0) return -1;
tsdbTrace(
"vgId%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d maxTables %d daysPerFile %d keep "
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d maxTables %d daysPerFile %d keep "
"%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d",
pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->maxTables, pCfg->daysPerFile, pCfg->keep,
pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression);
......@@ -142,6 +143,7 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
if (repo == NULL) return;
STsdbRepo *pRepo = (STsdbRepo *)repo;
int vgId = REPO_ID(pRepo);
if (toCommit) {
tsdbAsyncCommit(pRepo);
......@@ -151,7 +153,8 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
tsdbCloseFileH(pRepo);
tsdbCloseBufPool(pRepo);
tsdbCloseMeta(pRepo);
tsdbTrace("vgId:%d repository is closed", REPO_ID(pRepo));
tsdbFreeRepo(pRepo);
tsdbTrace("vgId:%d repository is closed", vgId);
}
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
......@@ -170,7 +173,6 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) {
pRsp->affectedRows = htonl(affectedrows);
return -1;
}
}
......@@ -534,7 +536,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
}
int tlen = tsdbEncodeCfg((void *)(&pBuf), pCfg);
ASSERT(tlen + sizeof(TSCKSUM) <= TSDB_FILE_HEAD_SIZE);
ASSERT((tlen + sizeof(TSCKSUM) <= TSDB_FILE_HEAD_SIZE) && (POINTER_DISTANCE(pBuf, buf) == tlen));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
......@@ -718,42 +720,11 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
return -1;
}
// Check schema version
int32_t tversion = pBlock->sversion;
STSchema *pSchema = tsdbGetTableSchema(pTable);
ASSERT(pSchema != NULL);
int16_t nversion = schemaVersion(pSchema);
if (tversion > nversion) {
tsdbTrace("vgId:%d table %s tid %d server schema version %d is older than clien version %d, try to config.",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), nversion, tversion);
void *msg = (*pRepo->appH.configFunc)(REPO_ID(pRepo), TABLE_TID(pTable));
if (msg == NULL) return -1;
// TODO: Deal with error her
STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg);
STable * pTableUpdate = NULL;
if (pTable->type == TSDB_CHILD_TABLE) {
pTableUpdate = tsdbGetTableByUid(pMeta, pTableCfg->superUid);
} else {
pTableUpdate = pTable;
}
int32_t code = tsdbUpdateTable(pMeta, pTableUpdate, pTableCfg);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbClearTableCfg(pTableCfg);
rpcFreeCont(msg);
pSchema = tsdbGetTableSchemaByVersion(pTable, tversion);
} else if (tversion < nversion) {
pSchema = tsdbGetTableSchemaByVersion(pTable, tversion);
if (pSchema == NULL) {
tsdbError("vgId:%d table %s tid %d invalid schema version %d from client", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), tversion);
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
// Check schema version and update schema if needed
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
tsdbError("vgId:%d failed to insert data to table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
return -1;
}
SSubmitBlkIter blkIter = {0};
......@@ -777,6 +748,8 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
(*affectedrows)++;
points++;
}
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
......@@ -820,6 +793,7 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
}
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
// TODO
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pFGroup = NULL;
......@@ -943,6 +917,55 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
return buf;
}
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
ASSERT(pTable != NULL);
STSchema *pSchema = tsdbGetTableSchema(pTable);
int sversion = schemaVersion(pSchema);
if (pBlock->sversion == sversion) return 0;
if (pBlock->sversion > sversion) { // need to config
tsdbTrace("vgId:%d table %s tid %d has version %d smaller than client version %d, try to config", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), sversion, pBlock->sversion);
if (pRepo->appH.configFunc) {
void *msg = (*pRepo->appH.configFunc)(REPO_ID(pRepo), TABLE_TID(pTable));
if (msg == NULL) {
tsdbError("vgId:%d failed to config table %s tid %d since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), tstrerror(terrno));
return -1;
}
STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg);
if (pTableCfg == NULL) {
rpcFreeCont(msg);
return -1;
}
if (tsdbUpdateTable(pRepo, (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable, pTableCfg) < 0) {
tsdbError("vgId:%d failed to update table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
tsdbClearTableCfg(pTableCfg);
rpcFreeCont(msg);
return -1;
}
tsdbClearTableCfg(pTableCfg);
rpcFreeCont(msg);
} else {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
} else {
if (tsdbGetTableSchemaByVersion(pTable, pBlock->sversion) == NULL) {
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
}
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
return 0;
}
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
// TODO
// STsdbCache *pCache = pRepo->tsdbCache;
......
......@@ -74,7 +74,7 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
// TODO
if (super->type != TSDB_SUPER_TABLE) return -1;
if (super->tableId.uid != pCfg->superUid) return -1;
tsdbUpdateTable(pMeta, super, pCfg);
tsdbUpdateTable(pRepo, super, pCfg);
}
}
......@@ -279,7 +279,7 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
STable * super = tsdbGetTableByUid(pMeta, pTableCfg->superUid);
ASSERT(super != NULL);
int32_t code = tsdbUpdateTable(pMeta, super, pTableCfg);
int32_t code = tsdbUpdateTable(pRepo, super, pTableCfg);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -447,16 +447,21 @@ STSchema *tsdbGetTableTagSchema(STable *pTable) {
}
}
int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) {
ASSERT(pTable->type != TSDB_CHILD_TABLE);
bool isChanged = false;
int tsdbUpdateTable(STsdbRepo *pRepo, STable *pTable, STableCfg *pCfg) {
// TODO: this function can only be called when there is no query and commit on this table
ASSERT(TABLE_TYPE(pTable) != TSDB_CHILD_TABLE);
bool changed = false;
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (pTable->type == TSDB_SUPER_TABLE) {
if (schemaVersion(pTable->tagSchema) < schemaVersion(pCfg->tagSchema)) {
int32_t code = tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema);
if (code != TSDB_CODE_SUCCESS) return code;
if (tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema) < 0) {
tsdbError("vgId:%d failed to update table %s tag schema since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
return -1;
}
}
isChanged = true;
changed = true;
}
STSchema *pTSchema = tsdbGetTableSchema(pTable);
......@@ -471,18 +476,19 @@ int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) {
pTable->schema[pTable->numOfSchemas - 1] = tSchema;
}
isChanged = true;
pMeta->maxRowBytes = MAX(pMeta->maxRowBytes, dataRowMaxBytesFromSchema(pCfg->schema));
pMeta->maxCols = MAX(pMeta->maxCols, schemaNCols(pCfg->schema));
changed = true;
}
if (isChanged) {
// TODO
// char *buf = malloc(1024 * 1024);
// tsdbEncodeTable(buf, pTable);
// // tsdbInsertMetaRecord(pMeta->mfh, pTable->tableId.uid, buf, bufLen);
// free(buf);
if (changed) {
int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable);
void *buf = tsdbAllocBytes(pRepo, tlen);
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
}
return TSDB_CODE_SUCCESS;
return 0;
}
int tsdbWLockRepoMeta(STsdbRepo *pRepo) {
......
......@@ -309,9 +309,7 @@ void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
}
void taosHashCleanup(SHashObj *pHashObj) {
if (pHashObj == NULL || pHashObj->capacity <= 0) {
return;
}
if (pHashObj == NULL) return;
SHashNode *pNode, *pNext;
......
......@@ -58,25 +58,21 @@ int tdCreateKVStore(char *fname) {
if (fd < 0) {
uError("failed to open file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (tdInitKVStoreHeader(fd, fname) < 0) {
close(fd);
return -1;
}
if (tdInitKVStoreHeader(fd, fname) < 0) goto _err;
if (fsync(fd) < 0) {
uError("failed to fsync file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
close(fd);
return -1;
goto _err;
}
if (close(fd) < 0) {
uError("failed to close file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
goto _err;
}
return 0;
......
......@@ -39,8 +39,10 @@ void tdListEmpty(SList *list) {
}
void tdListFree(SList *list) {
tdListEmpty(list);
free(list);
if (list) {
tdListEmpty(list);
free(list);
}
}
void tdListPrependNode(SList *list, SListNode *node) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册