diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 9a6045396a65f4c27b976de2d3390dcf05385e8c..eb457df62449cbaef273cf89b34347c4fec32de9 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -178,7 +178,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "vnode app // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "tsdb invalid table id") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_TYPE, 0, 0x0601, "tsdb invalid table type") -TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_SCHEMA_VERSION, 0, 0x0602, "tsdb invalid table schema version") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION, 0, 0x0602, "tsdb invalid table schema version") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_ALREADY_EXIST, 0, 0x0603, "tsdb table already exist") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CONFIG, 0, 0x0604, "tsdb invalid configuration") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INIT_FAILED, 0, 0x0605, "tsdb init failed") diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 022923e4b2316d7f5fb9838a0a8d55e3c965bdee..c783b91ec140cff920a5eb67d1534c2da1cf4388 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -84,15 +84,10 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) { goto _err; } - // // Restore key from file - // if (tsdbRestoreInfo(pRepo) < 0) { - // tsdbFreeCache(pRepo->tsdbCache); - // tsdbFreeMeta(pRepo->tsdbMeta); - // tsdbCloseFileH(pRepo->tsdbFileH); - // free(pRepo->rootDir); - // free(pRepo); - // return NULL; - // } + if (tsdbRestoreInfo(pRepo) < 0) { + tsdbError("vgId:%d failed to restore info from file since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } // pRepo->state = TSDB_REPO_STATE_ACTIVE; @@ -106,57 +101,28 @@ _err: return NULL; } -int32_t tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { - // TODO - // STsdbRepo *pRepo = (STsdbRepo *)repo; - // if (pRepo == NULL) return 0; - // int id = pRepo->config.tsdbId; - - // pRepo->state = TSDB_REPO_STATE_CLOSED; - // tsdbLockRepo(repo); - // if (pRepo->commit) { - // tsdbUnLockRepo(repo); - // return -1; - // } - // pRepo->commit = 1; - // // Loop to move pData to iData - // for (int i = 1; i < pRepo->config.maxTables; i++) { - // STable *pTable = pRepo->tsdbMeta->tables[i]; - // if (pTable != NULL && pTable->mem != NULL) { - // pTable->imem = pTable->mem; - // pTable->mem = NULL; - // } - // } - // // TODO: Loop to move mem to imem - // pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; - // pRepo->tsdbCache->mem = NULL; - // pRepo->tsdbCache->curBlock = NULL; - // tsdbUnLockRepo(repo); - - // if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); - // if (toCommit) tsdbCommitData((void *)repo); - - // tsdbCloseFileH(pRepo->tsdbFileH); - - // tsdbFreeMeta(pRepo->tsdbMeta); - - // tsdbFreeCache(pRepo->tsdbCache); - - // tfree(pRepo->rootDir); - // tfree(pRepo); - - // tsdbTrace("vgId:%d repository is closed!", id); +void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { + if (repo == NULL) return; + + STsdbRepo *pRepo = (STsdbRepo *)repo; + + // TODO: wait for commit over + + tsdbCloseFileH(pRepo); + tsdbCloseBufPool(pRepo); + tsdbCloseMeta(pRepo); + tsdbTrace("vgId:%d repository is closed", REPO_ID(pRepo)); return 0; } int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { STsdbRepo * pRepo = (STsdbRepo *)repo; - SSubmitMsgIter msgIter; + SSubmitMsgIter msgIter = {0}; if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) { - tsdbError("vgId:%d submit message is messed up", REPO_ID(pRepo)); - return terrno; + tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; } SSubmitBlk *pBlock = NULL; @@ -166,12 +132,13 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * TSKEY now = taosGetTimestamp(pRepo->config.precision); while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { - if ((code = tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows)) != TSDB_CODE_SUCCESS) { - return code; + if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) { + pRsp->affectedRows = htonl(affectedrows); + return -1; } } pRsp->affectedRows = htonl(affectedrows); - return code; + return 0; } uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size) { @@ -244,77 +211,67 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ return magic; } -int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { + +void tsdbStartStream(TSDB_REPO_T *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbMeta *pMeta = pRepo->tsdbMeta; - int16_t tversion = htons(pMsg->tversion); - - STable *pTable = tsdbGetTableByUid(pMeta, htobe64(pMsg->uid)); - if (pTable == NULL) return TSDB_CODE_TDB_INVALID_TABLE_ID; - if (pTable->tableId.tid != htonl(pMsg->tid)) return TSDB_CODE_TDB_INVALID_TABLE_ID; - if (pTable->type != TSDB_CHILD_TABLE) { - tsdbError("vgId:%d failed to update tag value of table %s since its type is %d", pRepo->config.tsdbId, - varDataVal(pTable->name), pTable->type); - return TSDB_CODE_TDB_INVALID_TABLE_TYPE; + for (int i = 0; i < pRepo->config.maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable && pTable->type == TSDB_STREAM_TABLE) { + pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TALBE_UID(pTable), TABLE_TID(pTable), pTable->sql, + tsdbGetTableSchema(pMeta, pTable)); + } } +} - if (schemaVersion(tsdbGetTableTagSchema(pMeta, pTable)) < tversion) { - tsdbTrace("vgId:%d server tag version %d is older than client tag version %d, try to config", pRepo->config.tsdbId, - schemaVersion(tsdbGetTableTagSchema(pMeta, pTable)), tversion); - void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, htonl(pMsg->tid)); - if (msg == NULL) { - return terrno; - } - // Deal with error her - STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg); - STable * super = tsdbGetTableByUid(pMeta, pTableCfg->superUid); - ASSERT(super != NULL); +STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { + ASSERT(repo != NULL); + return &((STsdbRepo *)repo)->config; +} - int32_t code = tsdbUpdateTable(pMeta, super, pTableCfg); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - tsdbClearTableCfg(pTableCfg); - rpcFreeCont(msg); - } +int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + STsdbCfg * pRCfg = &pRepo->config; - STSchema *pTagSchema = tsdbGetTableTagSchema(pMeta, pTable); + if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return TSDB_CODE_TDB_INVALID_CONFIG; - if (schemaVersion(pTagSchema) > tversion) { - tsdbError( - "vgId:%d failed to update tag value of table %s since version out of date, client tag version:%d server tag " - "version:%d", - pRepo->config.tsdbId, varDataVal(pTable->name), tversion, schemaVersion(pTable->tagSchema)); - return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE; + ASSERT(pRCfg->tsdbId == pCfg->tsdbId); + ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize); + ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile); + ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock); + ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock); + ASSERT(pRCfg->precision == pCfg->precision); + + bool configChanged = false; + if (pRCfg->compression != pCfg->compression) { + configChanged = true; + tsdbAlterCompression(pRepo, pCfg->compression); } - if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { - tsdbRemoveTableFromIndex(pMeta, pTable); + if (pRCfg->keep != pCfg->keep) { + configChanged = true; + tsdbAlterKeep(pRepo, pCfg->keep); } - // TODO: remove table from index if it is the first column of tag - tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data); - if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { - tsdbAddTableIntoIndex(pMeta, pTable); + if (pRCfg->totalBlocks != pCfg->totalBlocks) { + configChanged = true; + tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks); + } + if (pRCfg->maxTables != pCfg->maxTables) { + configChanged = true; + tsdbAlterMaxTables(pRepo, pCfg->maxTables); } - return TSDB_CODE_SUCCESS; -} -void tsdbStartStream(TSDB_REPO_T *repo) { - STsdbRepo *pRepo = (STsdbRepo *)repo; - STsdbMeta *pMeta = pRepo->tsdbMeta; + if (configChanged) tsdbSaveConfig(pRepo); - for (int i = 0; i < pRepo->config.maxTables; i++) { - STable *pTable = pMeta->tables[i]; - if (pTable && pTable->type == TSDB_STREAM_TABLE) { - pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid, - pTable->sql, tsdbGetTableSchema(pMeta, pTable)); - } - } + return TSDB_CODE_SUCCESS; } -STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { +void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) { ASSERT(repo != NULL); - return &((STsdbRepo *)repo)->config; + STsdbRepo *pRepo = repo; + *totalPoints = pRepo->stat.pointsWritten; + *totalStorage = pRepo->stat.totalStorage; + *compStorage = pRepo->stat.compStorage; } // ----------------- INTERNAL FUNCTIONS ----------------- @@ -352,84 +309,9 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { return 0; } -void *tsdbCommitData(void *arg) { - STsdbRepo *pRepo = (STsdbRepo *)arg; - STsdbMeta *pMeta = pRepo->tsdbMeta; - ASSERT(pRepo->imem != NULL); - ASSERT(pRepo->commit == 1); - - tsdbPrint("vgId:%d start to commit! keyFirst " PRId64 " keyLast " PRId64 " numOfRows " PRId64, REPO_ID(pRepo), - pRepo->imem->keyFirst, pRepo->imem->keyLast, pRepo->imem->numOfRows); - - // STsdbMeta * pMeta = pRepo->tsdbMeta; - // STsdbCache *pCache = pRepo->tsdbCache; - // STsdbCfg * pCfg = &(pRepo->config); - // SDataCols * pDataCols = NULL; - // SRWHelper whelper = {{0}}; - // if (pCache->imem == NULL) return NULL; - - tsdbPrint("vgId:%d, starting to commit....", pRepo->config.tsdbId); - - // Create the iterator to read from cache - SSkipListIterator **iters = tsdbCreateTableIters(pRepo); - if (iters == NULL) { - tsdbError("vgId:%d failed to create table iterators since %s", REPO_ID(pRepo), tstrerror(terrno)); - // TODO: deal with the error here - return NULL; - } - - if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { - tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); - // TODO - goto _exit; - } - - if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { - tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d since %s", REPO_ID(pRepo), - pMeta->maxRowBytes, pMeta->maxCols, tstrerror(terrno)); - // TODO - goto _exit; - } - - int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); - int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); - - // Loop to commit to each file - for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { - ASSERT(false); - goto _exit; - } - } - - // Do retention actions - tsdbFitRetention(pRepo); - if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); - -_exit: - tdFreeDataCols(pDataCols); - tsdbDestroyTableIters(iters, pCfg->maxTables); - tsdbDestroyHelper(&whelper); - - tsdbLockRepo(arg); - tdListMove(pCache->imem->list, pCache->pool.memPool); - tsdbAdjustCacheBlocks(pCache); - tdListFree(pCache->imem->list); - free(pCache->imem); - pCache->imem = NULL; - pRepo->commit = 0; - for (int i = 1; i < pCfg->maxTables; i++) { - STable *pTable = pMeta->tables[i]; - if (pTable && pTable->imem) { - tsdbFreeMemTable(pTable->imem); - pTable->imem = NULL; - } - } - tsdbUnLockRepo(arg); - tsdbPrint("vgId:%d, commit over....", pRepo->config.tsdbId); - - return NULL; -} +STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; } +STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; } +STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; } // ----------------- LOCAL FUNCTIONS ----------------- static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { @@ -757,7 +639,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY int64_t points = 0; STable *pTable == tsdbGetTableByUid(pMeta, pBlock->uid); - if (pTable == NULL || TABLE_TID(pTable)) { + if (pTable == NULL || TABLE_TID(pTable) != pBlock->tid) { tsdbError("vgId:%d failed to get table to insert data, uid " PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, pBlock->tid); terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; @@ -770,40 +652,43 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY return -1; } - // // Check schema version - // int32_t tversion = pBlock->sversion; - // STSchema * pSchema = tsdbGetTableSchema(pMeta, pTable); - // ASSERT(pSchema != NULL); - // int16_t nversion = schemaVersion(pSchema); - // if (tversion > nversion) { - // tsdbTrace("vgId:%d table:%s tid:%d server schema version %d is older than clien version %d, try to config.", - // pRepo->config.tsdbId, varDataVal(pTable->name), pTable->tableId.tid, nversion, tversion); - // void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, pTable->tableId.tid); - // if (msg == NULL) { - // return terrno; - // } - // // Deal with error her - // STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg); - // STable *pTableUpdate = NULL; - // if (pTable->type == TSDB_CHILD_TABLE) { - // pTableUpdate = tsdbGetTableByUid(pMeta, pTableCfg->superUid); - // } else { - // pTableUpdate = pTable; - // } - - // int32_t code = tsdbUpdateTable(pMeta, pTableUpdate, pTableCfg); - // if (code != TSDB_CODE_SUCCESS) { - // return code; - // } - // tsdbClearTableCfg(pTableCfg); - // rpcFreeCont(msg); - // } else { - // if (tsdbGetTableSchemaByVersion(pMeta, pTable, tversion) == NULL) { - // tsdbError("vgId:%d table:%s tid:%d invalid schema version %d from client", pRepo->config.tsdbId, - // varDataVal(pTable->name), pTable->tableId.tid, tversion); - // return TSDB_CODE_TDB_TABLE_SCHEMA_VERSION; - // } - // } + // Check schema version + int32_t tversion = pBlock->sversion; + STSchema * pSchema = tsdbGetTableSchema(pMeta, pTable); + ASSERT(pSchema != NULL); + int16_t nversion = schemaVersion(pSchema); + if (tversion > nversion) { + tsdbTrace("vgId:%d table %s tid %d server schema version %d is older than clien version %d, try to config.", + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), nversion, tversion); + void *msg = (*pRepo->appH.configFunc)(REPO_ID(pRepo), TABLE_TID(pTable)); + if (msg == NULL) return -1; + + // TODO: Deal with error her + STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg); + STable * pTableUpdate = NULL; + if (pTable->type == TSDB_CHILD_TABLE) { + pTableUpdate = tsdbGetTableByUid(pMeta, pTableCfg->superUid); + } else { + pTableUpdate = pTable; + } + + int32_t code = tsdbUpdateTable(pMeta, pTableUpdate, pTableCfg); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + tsdbClearTableCfg(pTableCfg); + rpcFreeCont(msg); + + pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, tversion); + } else if (tversion < nversion) { + pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, tversion); + if (pSchema == NULL) { + tsdbError("vgId:%d table %s tid %d invalid schema version %d from client", REPO_ID(pRepo), + TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), tversion); + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + return -1; + } + } SSubmitBlkIter blkIter = {0}; SDataRow row = NULL; @@ -817,17 +702,17 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY tsdbError("vgId:%d table %s tid %d uid %ld timestamp is out of range! now " PRId64 " maxKey " PRId64 " minKey " PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TALBE_UID(pTable), now, minKey, maxKey); - return TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; - } - - if (tdInsertRowToTable(pRepo, row, pTable) < 0) { + terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; return -1; } + + if (tsdbInsertRowToMem(pRepo, row, pTable) < 0) return -1; + (*affectedrows)++; points++; } - atomic_fetch_add_64(&(pRepo->stat.pointsWritten), points * (pSchema->numOfCols)); - atomic_fetch_add_64(&(pRepo->stat.totalStorage), points * pSchema->vlen); + pRepo->stat.pointsWritten += points * schemaNCols(pSchema); + pRepo->stat.totalStorage += points * schemaVLen(pSchema); return 0; } @@ -868,185 +753,6 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { return row; } -static int32_t tsdbInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable) { - // TODO - int32_t level = 0; - int32_t headSize = 0; - - if (pTable->mem == NULL) { - pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); - if (pTable->mem == NULL) return -1; - pTable->mem->pData = - tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, getTSTupleKey); - pTable->mem->keyFirst = INT64_MAX; - pTable->mem->keyLast = 0; - } - - tSkipListNewNodeInfo(pTable->mem->pData, &level, &headSize); - - TSKEY key = dataRowKey(row); - // printf("insert:%lld, size:%d\n", key, pTable->mem->numOfRows); - - // Copy row into the memory - SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key); - if (pNode == NULL) { - // TODO: deal with allocate failure - } - - pNode->level = level; - dataRowCpy(SL_GET_NODE_DATA(pNode), row); - - // Insert the skiplist node into the data - if (pTable->mem == NULL) { - pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); - if (pTable->mem == NULL) return -1; - pTable->mem->pData = - tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, getTSTupleKey); - pTable->mem->keyFirst = INT64_MAX; - pTable->mem->keyLast = 0; - } - tSkipListPut(pTable->mem->pData, pNode); - if (key > pTable->mem->keyLast) pTable->mem->keyLast = key; - if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; - if (key > pTable->lastKey) pTable->lastKey = key; - - pTable->mem->numOfRows = tSkipListGetSize(pTable->mem->pData); - - tsdbTrace("vgId:%d, tid:%d, uid:%" PRId64 ", table:%s a row is inserted to table! key:%" PRId64, pRepo->config.tsdbId, - pTable->tableId.tid, pTable->tableId.uid, varDataVal(pTable->name), dataRowKey(row)); - - return 0; -} - -static SSkipListIterator **tsdbCreateTableIters(STsdbRepo *pRepo) { - STsdbCfg *pCfg = &(pRepo->config); - - SSkipListIterator **iters = (SSkipListIterator **)calloc(pCfg->maxTables, sizeof(SSkipListIterator *)); - if (iters == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } - - for (int tid = 1; tid < maxTables; tid++) { - STable *pTable = pMeta->tables[tid]; - if (pTable == NULL || pTable->imem == NULL || pTable->imem->numOfRows == 0) continue; - - iters[tid] = tSkipListCreateIter(pTable->imem->pData); - if (iters[tid] == NULL) goto _err; - - if (!tSkipListIterNext(iters[tid])) goto _err; - } - - return iters; - -_err: - tsdbDestroyTableIters(iters, maxTables); - return NULL; -} - -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, - SDataCols *pDataCols) { - char dataDir[128] = {0}; - STsdbMeta * pMeta = pRepo->tsdbMeta; - STsdbFileH *pFileH = pRepo->tsdbFileH; - STsdbCfg * pCfg = &pRepo->config; - SFileGroup *pGroup = NULL; - - TSKEY minKey = 0, maxKey = 0; - tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - - // Check if there are data to commit to this file - int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); - if (!hasDataToCommit) return 0; // No data to commit, just return - - // Create and open files for commit - tsdbGetDataDirName(pRepo, dataDir); - if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) { - tsdbError("vgId:%d, failed to create file group %d", pRepo->config.tsdbId, fid); - goto _err; - } - - // Open files for write/read - if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { - tsdbError("vgId:%d, failed to set helper file", pRepo->config.tsdbId); - goto _err; - } - - // Loop to commit data in each table - for (int tid = 1; tid < pCfg->maxTables; tid++) { - STable *pTable = pMeta->tables[tid]; - if (pTable == NULL) continue; - - SSkipListIterator *pIter = iters[tid]; - - // Set the helper and the buffer dataCols object to help to write this table - tsdbSetHelperTable(pHelper, pTable, pRepo); - tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable)); - - // Loop to write the data in the cache to files. If no data to write, just break the loop - int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; - int nLoop = 0; - while (true) { - int rowsRead = tsdbReadRowsFromCache(pMeta, pTable, pIter, maxKey, maxRowsToRead, pDataCols); - assert(rowsRead >= 0); - if (pDataCols->numOfRows == 0) break; - nLoop++; - - ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey); - ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey); - - int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); - ASSERT(rowsWritten != 0); - if (rowsWritten < 0) goto _err; - ASSERT(rowsWritten <= pDataCols->numOfRows); - - tdPopDataColsPoints(pDataCols, rowsWritten); - maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows; - } - - ASSERT(pDataCols->numOfRows == 0); - - // Move the last block to the new .l file if neccessary - if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { - tsdbError("vgId:%d, failed to move last block", pRepo->config.tsdbId); - goto _err; - } - - // Write the SCompBlock part - if (tsdbWriteCompInfo(pHelper) < 0) { - tsdbError("vgId:%d, failed to write compInfo part", pRepo->config.tsdbId); - goto _err; - } - } - - if (tsdbWriteCompIdx(pHelper) < 0) { - tsdbError("vgId:%d, failed to write compIdx part", pRepo->config.tsdbId); - goto _err; - } - - tsdbCloseHelperFile(pHelper, 0); - // TODO: make it atomic with some methods - pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF; - pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF; - pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF; - - return 0; - -_err: - ASSERT(false); - tsdbCloseHelperFile(pHelper, 1); - return -1; -} - -static char *getTSTupleKey(const void *data) { - SDataRow row = (SDataRow)data; - return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE); -} - -#if 0 -** - * Set the default TSDB configuration - */ static int tsdbRestoreInfo(STsdbRepo *pRepo) { STsdbMeta * pMeta = pRepo->tsdbMeta; @@ -1057,11 +763,12 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { SRWHelper rhelper = {{0}}; if (tsdbInitReadHelper(&rhelper, pRepo) < 0) goto _err; - tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_ASC); + + tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC); while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) { if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err; for (int i = 1; i < pRepo->config.maxTables; i++) { - STable * pTable = pMeta->tables[i]; + STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; SCompIdx *pIdx = &rhelper.pCompIdx[i]; @@ -1077,93 +784,7 @@ _err: return -1; } -/** - * Change the configuration of a repository - * @param pCfg the repository configuration, the upper layer should free the pointer - * - * @return 0 for success, -1 for failure and the error number is set - */ -int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { - STsdbRepo *pRepo = (STsdbRepo *)repo; - STsdbCfg * pRCfg = &pRepo->config; - - if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return TSDB_CODE_TDB_INVALID_CONFIG; - - ASSERT(pRCfg->tsdbId == pCfg->tsdbId); - ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize); - ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile); - ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock); - ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock); - ASSERT(pRCfg->precision == pCfg->precision); - - bool configChanged = false; - if (pRCfg->compression != pCfg->compression) { - configChanged = true; - tsdbAlterCompression(pRepo, pCfg->compression); - } - if (pRCfg->keep != pCfg->keep) { - configChanged = true; - tsdbAlterKeep(pRepo, pCfg->keep); - } - if (pRCfg->totalBlocks != pCfg->totalBlocks) { - configChanged = true; - tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks); - } - if (pRCfg->maxTables != pCfg->maxTables) { - configChanged = true; - tsdbAlterMaxTables(pRepo, pCfg->maxTables); - } - - if (configChanged) tsdbSaveConfig(pRepo); - - return TSDB_CODE_SUCCESS; -} - - -/** - * Get the TSDB repository information, including some statistics - * @param pRepo the TSDB repository handle - * @param error the error number to set when failure occurs - * - * @return a info struct handle on success, NULL for failure and the error number is set. The upper - * layers should free the info handle themselves or memory leak will occur - */ -STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { - // TODO - return NULL; -} - - -TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) { - STsdbRepo *pRepo = (STsdbRepo *)repo; - - STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, uid); - if (pTable == NULL) return -1; - - return TSDB_GET_TABLE_LAST_KEY(pTable); -} - - -STableInfo *tsdbGetTableInfo(TSDB_REPO_T *pRepo, STableId tableId) { - // TODO - return NULL; -} - -// TODO: need to return the number of data inserted - -void tsdbClearTableCfg(STableCfg *config) { - if (config) { - if (config->schema) tdFreeSchema(config->schema); - if (config->tagSchema) tdFreeSchema(config->tagSchema); - if (config->tagValues) kvRowFree(config->tagValues); - tfree(config->name); - tfree(config->sname); - tfree(config->sql); - free(config); - } -} - -int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { +static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { if (pBlock->len <= 0) return -1; pIter->totalLen = pBlock->len; pIter->len = 0; @@ -1171,125 +792,58 @@ int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { return 0; } - - - -STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo) { - STsdbRepo *tsdb = (STsdbRepo *)pRepo; - return tsdb->tsdbMeta; -} - -STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo) { - STsdbRepo* tsdb = (STsdbRepo*) pRepo; - return tsdb->tsdbFileH; +static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) { + int8_t oldCompRession = pRepo->config.compression; + pRepo->config.compression = compression; + tsdbTrace("vgId:%d tsdb compression is changed from %d to %d", oldCompRession, compression); } -// Check the configuration and set default options - - - -static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg) { - char fname[128] = "\0"; - - if (tsdbGetCfgFname(pRepo, fname) < 0) return -1; - - int fd = open(fname, O_RDONLY); - if (fd < 0) { - return -1; - } +static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { + STsdbCfg *pCfg = &pRepo->config; + int oldKeep = pCfg->keep; - if (read(fd, (void *)pCfg, sizeof(STsdbCfg)) < sizeof(STsdbCfg)) { - close(fd); - return -1; + int maxFiles = keep / pCfg->maxTables + 3; + if (pRepo->config.keep > keep) { + pRepo->config.keep = keep; + pRepo->tsdbFileH->maxFGroups = maxFiles; + } else { + pRepo->config.keep = keep; + pRepo->tsdbFileH->fGroup = realloc(pRepo->tsdbFileH->fGroup, sizeof(SFileGroup)); + if (pRepo->tsdbFileH->fGroup == NULL) { + // TODO: deal with the error + } + pRepo->tsdbFileH->maxFGroups = maxFiles; } - - close(fd); - - return 0; + tsdbTrace("vgId:%d, keep is changed from %d to %d", pRepo->config.tsdbId, oldKeep, keep); } - - -static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) { - char fname[260]; - if (pRepo == NULL) return 0; - char *dirName = calloc(1, strlen(pRepo->rootDir) + strlen("tsdb") + 2); - if (dirName == NULL) { - return -1; - } - - sprintf(dirName, "%s/%s", pRepo->rootDir, "tsdb"); - - DIR *dir = opendir(dirName); - if (dir == NULL) return -1; - - struct dirent *dp; - while ((dp = readdir(dir)) != NULL) { - if ((strcmp(dp->d_name, ".") == 0) || (strcmp(dp->d_name, "..") == 0)) continue; - sprintf(fname, "%s/%s", pRepo->rootDir, dp->d_name); - remove(fname); +static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) { + int oldMaxTables = pRepo->config.maxTables; + if (oldMaxTables < pRepo->config.maxTables) { + // TODO } - closedir(dir); - - rmdir(dirName); - - return 0; -} - - -static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { - ASSERT(maxRowsToRead > 0); - if (pIter == NULL) return 0; - STSchema *pSchema = NULL; - - int numOfRows = 0; - - do { - if (numOfRows >= maxRowsToRead) break; - - SSkipListNode *node = tSkipListIterGet(pIter); - if (node == NULL) break; - - SDataRow row = SL_GET_NODE_DATA(node); - if (dataRowKey(row) > maxKey) break; - - if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row)); - if (pSchema == NULL) { - // TODO: deal with the error here - ASSERT(false); - } - } + STsdbMeta *pMeta = pRepo->tsdbMeta; - tdAppendDataRowToDataCol(row, pSchema, pCols); - numOfRows++; - } while (tSkipListIterNext(pIter)); + pMeta->maxTables = maxTables; + pMeta->tables = realloc(pMeta->tables, maxTables * sizeof(STable *)); + memset(&pMeta->tables[oldMaxTables], 0, sizeof(STable *) * (maxTables - oldMaxTables)); + pRepo->config.maxTables = maxTables; - return numOfRows; + tsdbTrace("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables); } -static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) { - if (iters == NULL) return; - - for (int tid = 1; tid < maxTables; tid++) { - if (iters[tid] == NULL) continue; - tSkipListDestroyIter(iters[tid]); - } +#if 0 - free(iters); -} +TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, uid); + if (pTable == NULL) return -1; -static void tsdbFreeMemTable(SMemTable *pMemTable) { - if (pMemTable) { - tSkipListDestroy(pMemTable->pData); - free(pMemTable); - } + return TSDB_GET_TABLE_LAST_KEY(pTable); } -// Commit to file - /** * Return the next iterator key. @@ -1317,54 +871,8 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK return 0; } -static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) { - int8_t oldCompRession = pRepo->config.compression; - pRepo->config.compression = compression; - tsdbTrace("vgId:%d, tsdb compression is changed from %d to %d", oldCompRession, compression); -} -static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { - STsdbCfg *pCfg = &pRepo->config; - int oldKeep = pCfg->keep; - int maxFiles = keep / pCfg->maxTables + 3; - if (pRepo->config.keep > keep) { - pRepo->config.keep = keep; - pRepo->tsdbFileH->maxFGroups = maxFiles; - } else { - pRepo->config.keep = keep; - pRepo->tsdbFileH->fGroup = realloc(pRepo->tsdbFileH->fGroup, sizeof(SFileGroup)); - if (pRepo->tsdbFileH->fGroup == NULL) { - // TODO: deal with the error - } - pRepo->tsdbFileH->maxFGroups = maxFiles; - } - tsdbTrace("vgId:%d, keep is changed from %d to %d", pRepo->config.tsdbId, oldKeep, keep); -} -static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) { - int oldMaxTables = pRepo->config.maxTables; - if (oldMaxTables < pRepo->config.maxTables) { - // TODO - } - - STsdbMeta *pMeta = pRepo->tsdbMeta; - - pMeta->maxTables = maxTables; - pMeta->tables = realloc(pMeta->tables, maxTables * sizeof(STable *)); - memset(&pMeta->tables[oldMaxTables], 0, sizeof(STable *) * (maxTables-oldMaxTables)); - pRepo->config.maxTables = maxTables; - - tsdbTrace("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables); -} - - -void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage){ - ASSERT(repo != NULL); - STsdbRepo * pRepo = repo; - *totalPoints = pRepo->stat.pointsWritten; - *totalStorage = pRepo->stat.totalStorage; - *compStorage = pRepo->stat.compStorage; -} #endif \ No newline at end of file diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 0a693cedc65fdc68917db96b6178a197049ea1e2..593454243dc24badd2115a0389ad4ffa1acc4102 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -317,4 +317,245 @@ static void tsdbFreeTableData(STableData *pTableData) { } } -static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); } \ No newline at end of file +static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); } + +static void *tsdbCommitData(void *arg) { + STsdbRepo *pRepo = (STsdbRepo *)arg; + STsdbMeta *pMeta = pRepo->tsdbMeta; + ASSERT(pRepo->imem != NULL); + ASSERT(pRepo->commit == 1); + + tsdbPrint("vgId:%d start to commit! keyFirst " PRId64 " keyLast " PRId64 " numOfRows " PRId64, REPO_ID(pRepo), + pRepo->imem->keyFirst, pRepo->imem->keyLast, pRepo->imem->numOfRows); + + // STsdbMeta * pMeta = pRepo->tsdbMeta; + // STsdbCache *pCache = pRepo->tsdbCache; + // STsdbCfg * pCfg = &(pRepo->config); + // SDataCols * pDataCols = NULL; + // SRWHelper whelper = {{0}}; + // if (pCache->imem == NULL) return NULL; + + tsdbPrint("vgId:%d, starting to commit....", pRepo->config.tsdbId); + + // Create the iterator to read from cache + SSkipListIterator **iters = tsdbCreateTableIters(pRepo); + if (iters == NULL) { + tsdbError("vgId:%d failed to create table iterators since %s", REPO_ID(pRepo), tstrerror(terrno)); + // TODO: deal with the error here + return NULL; + } + + if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { + tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); + // TODO + goto _exit; + } + + if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { + tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d since %s", REPO_ID(pRepo), + pMeta->maxRowBytes, pMeta->maxCols, tstrerror(terrno)); + // TODO + goto _exit; + } + + int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); + int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); + + // Loop to commit to each file + for (int fid = sfid; fid <= efid; fid++) { + if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { + ASSERT(false); + goto _exit; + } + } + + // Do retention actions + tsdbFitRetention(pRepo); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); + +_exit: + tdFreeDataCols(pDataCols); + tsdbDestroyTableIters(iters, pCfg->maxTables); + tsdbDestroyHelper(&whelper); + + tsdbLockRepo(arg); + tdListMove(pCache->imem->list, pCache->pool.memPool); + tsdbAdjustCacheBlocks(pCache); + tdListFree(pCache->imem->list); + free(pCache->imem); + pCache->imem = NULL; + pRepo->commit = 0; + for (int i = 1; i < pCfg->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable && pTable->imem) { + tsdbFreeMemTable(pTable->imem); + pTable->imem = NULL; + } + } + tsdbUnLockRepo(arg); + tsdbPrint("vgId:%d, commit over....", pRepo->config.tsdbId); + + return NULL; +} + +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, + SDataCols *pDataCols) { + char dataDir[128] = {0}; + STsdbMeta * pMeta = pRepo->tsdbMeta; + STsdbFileH *pFileH = pRepo->tsdbFileH; + STsdbCfg * pCfg = &pRepo->config; + SFileGroup *pGroup = NULL; + + TSKEY minKey = 0, maxKey = 0; + tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + + // Check if there are data to commit to this file + int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); + if (!hasDataToCommit) return 0; // No data to commit, just return + + // Create and open files for commit + tsdbGetDataDirName(pRepo, dataDir); + if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) { + tsdbError("vgId:%d, failed to create file group %d", pRepo->config.tsdbId, fid); + goto _err; + } + + // Open files for write/read + if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { + tsdbError("vgId:%d, failed to set helper file", pRepo->config.tsdbId); + goto _err; + } + + // Loop to commit data in each table + for (int tid = 1; tid < pCfg->maxTables; tid++) { + STable *pTable = pMeta->tables[tid]; + if (pTable == NULL) continue; + + SSkipListIterator *pIter = iters[tid]; + + // Set the helper and the buffer dataCols object to help to write this table + tsdbSetHelperTable(pHelper, pTable, pRepo); + tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable)); + + // Loop to write the data in the cache to files. If no data to write, just break the loop + int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; + int nLoop = 0; + while (true) { + int rowsRead = tsdbReadRowsFromCache(pMeta, pTable, pIter, maxKey, maxRowsToRead, pDataCols); + assert(rowsRead >= 0); + if (pDataCols->numOfRows == 0) break; + nLoop++; + + ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey); + ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey); + + int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); + ASSERT(rowsWritten != 0); + if (rowsWritten < 0) goto _err; + ASSERT(rowsWritten <= pDataCols->numOfRows); + + tdPopDataColsPoints(pDataCols, rowsWritten); + maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows; + } + + ASSERT(pDataCols->numOfRows == 0); + + // Move the last block to the new .l file if neccessary + if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { + tsdbError("vgId:%d, failed to move last block", pRepo->config.tsdbId); + goto _err; + } + + // Write the SCompBlock part + if (tsdbWriteCompInfo(pHelper) < 0) { + tsdbError("vgId:%d, failed to write compInfo part", pRepo->config.tsdbId); + goto _err; + } + } + + if (tsdbWriteCompIdx(pHelper) < 0) { + tsdbError("vgId:%d, failed to write compIdx part", pRepo->config.tsdbId); + goto _err; + } + + tsdbCloseHelperFile(pHelper, 0); + // TODO: make it atomic with some methods + pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF; + pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF; + pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF; + + return 0; + +_err: + ASSERT(false); + tsdbCloseHelperFile(pHelper, 1); + return -1; +} + +static SSkipListIterator **tsdbCreateTableIters(STsdbRepo *pRepo) { + STsdbCfg *pCfg = &(pRepo->config); + + SSkipListIterator **iters = (SSkipListIterator **)calloc(pCfg->maxTables, sizeof(SSkipListIterator *)); + if (iters == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + for (int tid = 1; tid < maxTables; tid++) { + STable *pTable = pMeta->tables[tid]; + if (pTable == NULL || pTable->imem == NULL || pTable->imem->numOfRows == 0) continue; + + iters[tid] = tSkipListCreateIter(pTable->imem->pData); + if (iters[tid] == NULL) goto _err; + + if (!tSkipListIterNext(iters[tid])) goto _err; + } + + return iters; + +_err: + tsdbDestroyTableIters(iters, maxTables); + return NULL; +} + +static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) { + if (iters == NULL) return; + + for (int tid = 1; tid < maxTables; tid++) { + if (iters[tid] == NULL) continue; + tSkipListDestroyIter(iters[tid]); + } + + free(iters); +} + +static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { + ASSERT(maxRowsToRead > 0); + if (pIter == NULL) return 0; + STSchema *pSchema = NULL; + + int numOfRows = 0; + + do { + if (numOfRows >= maxRowsToRead) break; + + SSkipListNode *node = tSkipListIterGet(pIter); + if (node == NULL) break; + + SDataRow row = SL_GET_NODE_DATA(node); + if (dataRowKey(row) > maxKey) break; + + if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row)); + if (pSchema == NULL) { + // TODO: deal with the error here + ASSERT(false); + } + } + + tdAppendDataRowToDataCol(row, pSchema, pCols); + numOfRows++; + } while (tSkipListIterNext(pIter)); + + return numOfRows; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index d9b054ecbbb0e2b4c850bb0f70a6cec1504faaf4..10866bbae5daf5b8a9e00cde5597136a31f9494a 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -42,6 +42,7 @@ static int tsdbTableSetSName(STableCfg *config, char *sname, bool dup); static int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid); static int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup); static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup); +static void tsdbClearTableCfg(STableCfg *config); static void * tsdbEncodeTableName(void *buf, tstr *name); static void * tsdbDecodeTableName(void *buf, tstr **name); static void * tsdbEncodeTable(void *buf, STable *pTable); @@ -243,6 +244,67 @@ _err: return NULL; } +int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + STsdbMeta *pMeta = pRepo->tsdbMeta; + int16_t tversion = htons(pMsg->tversion); + + STable *pTable = tsdbGetTableByUid(pMeta, htobe64(pMsg->uid)); + if (pTable == NULL) { + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + return -1; + } + if (TABLE_TID(pTable) != htonl(pMsg->tid)) { + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID + return -1; + } + + if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { + tsdbError("vgId:%d failed to update tag value of table %s since its type is %d", REPO_ID(pRepo), + TABLE_CHAR_NAME(pTable), TABLE_TYPE(pTable)); + terrno = TSDB_CODE_TDB_INVALID_ACTION; + return -1; + } + + if (schemaVersion(tsdbGetTableTagSchema(pTable)) < tversion) { + tsdbTrace("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo), + schemaVersion(tsdbGetTableTagSchema(pTable)), tversion); + void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, htonl(pMsg->tid)); + if (msg == NULL) return -1; + + // Deal with error her + STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg); + STable * super = tsdbGetTableByUid(pMeta, pTableCfg->superUid); + ASSERT(super != NULL); + + int32_t code = tsdbUpdateTable(pMeta, super, pTableCfg); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + tsdbClearTableCfg(pTableCfg); + rpcFreeCont(msg); + } + + STSchema *pTagSchema = tsdbGetTableTagSchema(pTable); + + if (schemaVersion(pTagSchema) > tversion) { + tsdbError( + "vgId:%d failed to update tag value of table %s since version out of date, client tag version:%d server tag " + "version:%d", + pRepo->config.tsdbId, varDataVal(pTable->name), tversion, schemaVersion(pTable->tagSchema)); + return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE; + } + if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { + tsdbRemoveTableFromIndex(pMeta, pTable); + } + // TODO: remove table from index if it is the first column of tag + tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data); + if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { + tsdbAddTableIntoIndex(pMeta, pTable); + } + return TSDB_CODE_SUCCESS; +} + // ------------------ INTERNAL FUNCTIONS ------------------ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) { STsdbMeta *pMeta = (STsdbMeta *)calloc(1, sizeof(*pMeta)); @@ -896,6 +958,18 @@ static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) { return 0; } +static void tsdbClearTableCfg(STableCfg *config) { + if (config) { + if (config->schema) tdFreeSchema(config->schema); + if (config->tagSchema) tdFreeSchema(config->tagSchema); + if (config->tagValues) kvRowFree(config->tagValues); + tfree(config->name); + tfree(config->sname); + tfree(config->sql); + free(config); + } +} + static void *tsdbEncodeTableName(void *buf, tstr *name) { void *pBuf = buf; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index e57188b479d2e5328d85a0b8df2d1c8873552b68..032dbde9eac8e4edc07fbe017f6de1e7cdf4cd0b 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -87,7 +87,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { } static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; // save insert result into item @@ -96,7 +96,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR pRet->len = sizeof(SShellSubmitRspMsg); pRet->rsp = rpcMallocCont(pRet->len); SShellSubmitRspMsg *pRsp = pRet->rsp; - code = tsdbInsertData(pVnode->tsdb, pCont, pRsp); + if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno; pRsp->numOfFailedBlocks = 0; //TODO //pRet->len += pRsp->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); //TODO pRsp->code = 0;