提交 77b4dcf8 编写于 作者: H Hongze Cheng

TD-353

上级 618b3d22
......@@ -178,7 +178,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "vnode app
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "tsdb invalid table id")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_TYPE, 0, 0x0601, "tsdb invalid table type")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_SCHEMA_VERSION, 0, 0x0602, "tsdb invalid table schema version")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION, 0, 0x0602, "tsdb invalid table schema version")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_ALREADY_EXIST, 0, 0x0603, "tsdb table already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CONFIG, 0, 0x0604, "tsdb invalid configuration")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INIT_FAILED, 0, 0x0605, "tsdb init failed")
......
......@@ -84,15 +84,10 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
goto _err;
}
// // Restore key from file
// if (tsdbRestoreInfo(pRepo) < 0) {
// tsdbFreeCache(pRepo->tsdbCache);
// tsdbFreeMeta(pRepo->tsdbMeta);
// tsdbCloseFileH(pRepo->tsdbFileH);
// free(pRepo->rootDir);
// free(pRepo);
// return NULL;
// }
if (tsdbRestoreInfo(pRepo) < 0) {
tsdbError("vgId:%d failed to restore info from file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// pRepo->state = TSDB_REPO_STATE_ACTIVE;
......@@ -106,57 +101,28 @@ _err:
return NULL;
}
int32_t tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
// TODO
// STsdbRepo *pRepo = (STsdbRepo *)repo;
// if (pRepo == NULL) return 0;
// int id = pRepo->config.tsdbId;
// pRepo->state = TSDB_REPO_STATE_CLOSED;
// tsdbLockRepo(repo);
// if (pRepo->commit) {
// tsdbUnLockRepo(repo);
// return -1;
// }
// pRepo->commit = 1;
// // Loop to move pData to iData
// for (int i = 1; i < pRepo->config.maxTables; i++) {
// STable *pTable = pRepo->tsdbMeta->tables[i];
// if (pTable != NULL && pTable->mem != NULL) {
// pTable->imem = pTable->mem;
// pTable->mem = NULL;
// }
// }
// // TODO: Loop to move mem to imem
// pRepo->tsdbCache->imem = pRepo->tsdbCache->mem;
// pRepo->tsdbCache->mem = NULL;
// pRepo->tsdbCache->curBlock = NULL;
// tsdbUnLockRepo(repo);
// if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
// if (toCommit) tsdbCommitData((void *)repo);
// tsdbCloseFileH(pRepo->tsdbFileH);
// tsdbFreeMeta(pRepo->tsdbMeta);
// tsdbFreeCache(pRepo->tsdbCache);
// tfree(pRepo->rootDir);
// tfree(pRepo);
// tsdbTrace("vgId:%d repository is closed!", id);
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
if (repo == NULL) return;
STsdbRepo *pRepo = (STsdbRepo *)repo;
// TODO: wait for commit over
tsdbCloseFileH(pRepo);
tsdbCloseBufPool(pRepo);
tsdbCloseMeta(pRepo);
tsdbTrace("vgId:%d repository is closed", REPO_ID(pRepo));
return 0;
}
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
STsdbRepo * pRepo = (STsdbRepo *)repo;
SSubmitMsgIter msgIter;
SSubmitMsgIter msgIter = {0};
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) {
tsdbError("vgId:%d submit message is messed up", REPO_ID(pRepo));
return terrno;
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
SSubmitBlk *pBlock = NULL;
......@@ -166,12 +132,13 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
TSKEY now = taosGetTimestamp(pRepo->config.precision);
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
if ((code = tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows)) != TSDB_CODE_SUCCESS) {
return code;
if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) {
pRsp->affectedRows = htonl(affectedrows);
return -1;
}
}
pRsp->affectedRows = htonl(affectedrows);
return code;
return 0;
}
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size) {
......@@ -244,77 +211,67 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
return magic;
}
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
void tsdbStartStream(TSDB_REPO_T *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
int16_t tversion = htons(pMsg->tversion);
STable *pTable = tsdbGetTableByUid(pMeta, htobe64(pMsg->uid));
if (pTable == NULL) return TSDB_CODE_TDB_INVALID_TABLE_ID;
if (pTable->tableId.tid != htonl(pMsg->tid)) return TSDB_CODE_TDB_INVALID_TABLE_ID;
if (pTable->type != TSDB_CHILD_TABLE) {
tsdbError("vgId:%d failed to update tag value of table %s since its type is %d", pRepo->config.tsdbId,
varDataVal(pTable->name), pTable->type);
return TSDB_CODE_TDB_INVALID_TABLE_TYPE;
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TALBE_UID(pTable), TABLE_TID(pTable), pTable->sql,
tsdbGetTableSchema(pMeta, pTable));
}
if (schemaVersion(tsdbGetTableTagSchema(pMeta, pTable)) < tversion) {
tsdbTrace("vgId:%d server tag version %d is older than client tag version %d, try to config", pRepo->config.tsdbId,
schemaVersion(tsdbGetTableTagSchema(pMeta, pTable)), tversion);
void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, htonl(pMsg->tid));
if (msg == NULL) {
return terrno;
}
// Deal with error her
STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg);
STable * super = tsdbGetTableByUid(pMeta, pTableCfg->superUid);
ASSERT(super != NULL);
}
int32_t code = tsdbUpdateTable(pMeta, super, pTableCfg);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbClearTableCfg(pTableCfg);
rpcFreeCont(msg);
}
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) {
ASSERT(repo != NULL);
return &((STsdbRepo *)repo)->config;
}
STSchema *pTagSchema = tsdbGetTableTagSchema(pMeta, pTable);
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbCfg * pRCfg = &pRepo->config;
if (schemaVersion(pTagSchema) > tversion) {
tsdbError(
"vgId:%d failed to update tag value of table %s since version out of date, client tag version:%d server tag "
"version:%d",
pRepo->config.tsdbId, varDataVal(pTable->name), tversion, schemaVersion(pTable->tagSchema));
return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return TSDB_CODE_TDB_INVALID_CONFIG;
ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile);
ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock);
ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock);
ASSERT(pRCfg->precision == pCfg->precision);
bool configChanged = false;
if (pRCfg->compression != pCfg->compression) {
configChanged = true;
tsdbAlterCompression(pRepo, pCfg->compression);
}
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) {
tsdbRemoveTableFromIndex(pMeta, pTable);
if (pRCfg->keep != pCfg->keep) {
configChanged = true;
tsdbAlterKeep(pRepo, pCfg->keep);
}
// TODO: remove table from index if it is the first column of tag
tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data);
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) {
tsdbAddTableIntoIndex(pMeta, pTable);
if (pRCfg->totalBlocks != pCfg->totalBlocks) {
configChanged = true;
tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
}
if (pRCfg->maxTables != pCfg->maxTables) {
configChanged = true;
tsdbAlterMaxTables(pRepo, pCfg->maxTables);
}
return TSDB_CODE_SUCCESS;
}
void tsdbStartStream(TSDB_REPO_T *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (configChanged) tsdbSaveConfig(pRepo);
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid,
pTable->sql, tsdbGetTableSchema(pMeta, pTable));
}
}
return TSDB_CODE_SUCCESS;
}
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) {
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
ASSERT(repo != NULL);
return &((STsdbRepo *)repo)->config;
STsdbRepo *pRepo = repo;
*totalPoints = pRepo->stat.pointsWritten;
*totalStorage = pRepo->stat.totalStorage;
*compStorage = pRepo->stat.compStorage;
}
// ----------------- INTERNAL FUNCTIONS -----------------
......@@ -352,84 +309,9 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
return 0;
}
void *tsdbCommitData(void *arg) {
STsdbRepo *pRepo = (STsdbRepo *)arg;
STsdbMeta *pMeta = pRepo->tsdbMeta;
ASSERT(pRepo->imem != NULL);
ASSERT(pRepo->commit == 1);
tsdbPrint("vgId:%d start to commit! keyFirst " PRId64 " keyLast " PRId64 " numOfRows " PRId64, REPO_ID(pRepo),
pRepo->imem->keyFirst, pRepo->imem->keyLast, pRepo->imem->numOfRows);
// STsdbMeta * pMeta = pRepo->tsdbMeta;
// STsdbCache *pCache = pRepo->tsdbCache;
// STsdbCfg * pCfg = &(pRepo->config);
// SDataCols * pDataCols = NULL;
// SRWHelper whelper = {{0}};
// if (pCache->imem == NULL) return NULL;
tsdbPrint("vgId:%d, starting to commit....", pRepo->config.tsdbId);
// Create the iterator to read from cache
SSkipListIterator **iters = tsdbCreateTableIters(pRepo);
if (iters == NULL) {
tsdbError("vgId:%d failed to create table iterators since %s", REPO_ID(pRepo), tstrerror(terrno));
// TODO: deal with the error here
return NULL;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
// TODO
goto _exit;
}
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d since %s", REPO_ID(pRepo),
pMeta->maxRowBytes, pMeta->maxCols, tstrerror(terrno));
// TODO
goto _exit;
}
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
ASSERT(false);
goto _exit;
}
}
// Do retention actions
tsdbFitRetention(pRepo);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyTableIters(iters, pCfg->maxTables);
tsdbDestroyHelper(&whelper);
tsdbLockRepo(arg);
tdListMove(pCache->imem->list, pCache->pool.memPool);
tsdbAdjustCacheBlocks(pCache);
tdListFree(pCache->imem->list);
free(pCache->imem);
pCache->imem = NULL;
pRepo->commit = 0;
for (int i = 1; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->imem) {
tsdbFreeMemTable(pTable->imem);
pTable->imem = NULL;
}
}
tsdbUnLockRepo(arg);
tsdbPrint("vgId:%d, commit over....", pRepo->config.tsdbId);
return NULL;
}
STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; }
STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; }
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; }
// ----------------- LOCAL FUNCTIONS -----------------
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
......@@ -757,7 +639,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
int64_t points = 0;
STable *pTable == tsdbGetTableByUid(pMeta, pBlock->uid);
if (pTable == NULL || TABLE_TID(pTable)) {
if (pTable == NULL || TABLE_TID(pTable) != pBlock->tid) {
tsdbError("vgId:%d failed to get table to insert data, uid " PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
......@@ -770,40 +652,43 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
return -1;
}
// // Check schema version
// int32_t tversion = pBlock->sversion;
// STSchema * pSchema = tsdbGetTableSchema(pMeta, pTable);
// ASSERT(pSchema != NULL);
// int16_t nversion = schemaVersion(pSchema);
// if (tversion > nversion) {
// tsdbTrace("vgId:%d table:%s tid:%d server schema version %d is older than clien version %d, try to config.",
// pRepo->config.tsdbId, varDataVal(pTable->name), pTable->tableId.tid, nversion, tversion);
// void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, pTable->tableId.tid);
// if (msg == NULL) {
// return terrno;
// }
// // Deal with error her
// STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg);
// STable *pTableUpdate = NULL;
// if (pTable->type == TSDB_CHILD_TABLE) {
// pTableUpdate = tsdbGetTableByUid(pMeta, pTableCfg->superUid);
// } else {
// pTableUpdate = pTable;
// }
// int32_t code = tsdbUpdateTable(pMeta, pTableUpdate, pTableCfg);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
// tsdbClearTableCfg(pTableCfg);
// rpcFreeCont(msg);
// } else {
// if (tsdbGetTableSchemaByVersion(pMeta, pTable, tversion) == NULL) {
// tsdbError("vgId:%d table:%s tid:%d invalid schema version %d from client", pRepo->config.tsdbId,
// varDataVal(pTable->name), pTable->tableId.tid, tversion);
// return TSDB_CODE_TDB_TABLE_SCHEMA_VERSION;
// }
// }
// Check schema version
int32_t tversion = pBlock->sversion;
STSchema * pSchema = tsdbGetTableSchema(pMeta, pTable);
ASSERT(pSchema != NULL);
int16_t nversion = schemaVersion(pSchema);
if (tversion > nversion) {
tsdbTrace("vgId:%d table %s tid %d server schema version %d is older than clien version %d, try to config.",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), nversion, tversion);
void *msg = (*pRepo->appH.configFunc)(REPO_ID(pRepo), TABLE_TID(pTable));
if (msg == NULL) return -1;
// TODO: Deal with error her
STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg);
STable * pTableUpdate = NULL;
if (pTable->type == TSDB_CHILD_TABLE) {
pTableUpdate = tsdbGetTableByUid(pMeta, pTableCfg->superUid);
} else {
pTableUpdate = pTable;
}
int32_t code = tsdbUpdateTable(pMeta, pTableUpdate, pTableCfg);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbClearTableCfg(pTableCfg);
rpcFreeCont(msg);
pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, tversion);
} else if (tversion < nversion) {
pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, tversion);
if (pSchema == NULL) {
tsdbError("vgId:%d table %s tid %d invalid schema version %d from client", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), tversion);
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
}
SSubmitBlkIter blkIter = {0};
SDataRow row = NULL;
......@@ -817,17 +702,17 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
tsdbError("vgId:%d table %s tid %d uid %ld timestamp is out of range! now " PRId64 " maxKey " PRId64
" minKey " PRId64,
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TALBE_UID(pTable), now, minKey, maxKey);
return TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
}
if (tdInsertRowToTable(pRepo, row, pTable) < 0) {
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1;
}
if (tsdbInsertRowToMem(pRepo, row, pTable) < 0) return -1;
(*affectedrows)++;
points++;
}
atomic_fetch_add_64(&(pRepo->stat.pointsWritten), points * (pSchema->numOfCols));
atomic_fetch_add_64(&(pRepo->stat.totalStorage), points * pSchema->vlen);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
return 0;
}
......@@ -868,185 +753,6 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
return row;
}
static int32_t tsdbInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
// TODO
int32_t level = 0;
int32_t headSize = 0;
if (pTable->mem == NULL) {
pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable));
if (pTable->mem == NULL) return -1;
pTable->mem->pData =
tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, getTSTupleKey);
pTable->mem->keyFirst = INT64_MAX;
pTable->mem->keyLast = 0;
}
tSkipListNewNodeInfo(pTable->mem->pData, &level, &headSize);
TSKEY key = dataRowKey(row);
// printf("insert:%lld, size:%d\n", key, pTable->mem->numOfRows);
// Copy row into the memory
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key);
if (pNode == NULL) {
// TODO: deal with allocate failure
}
pNode->level = level;
dataRowCpy(SL_GET_NODE_DATA(pNode), row);
// Insert the skiplist node into the data
if (pTable->mem == NULL) {
pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable));
if (pTable->mem == NULL) return -1;
pTable->mem->pData =
tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, getTSTupleKey);
pTable->mem->keyFirst = INT64_MAX;
pTable->mem->keyLast = 0;
}
tSkipListPut(pTable->mem->pData, pNode);
if (key > pTable->mem->keyLast) pTable->mem->keyLast = key;
if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key;
if (key > pTable->lastKey) pTable->lastKey = key;
pTable->mem->numOfRows = tSkipListGetSize(pTable->mem->pData);
tsdbTrace("vgId:%d, tid:%d, uid:%" PRId64 ", table:%s a row is inserted to table! key:%" PRId64, pRepo->config.tsdbId,
pTable->tableId.tid, pTable->tableId.uid, varDataVal(pTable->name), dataRowKey(row));
return 0;
}
static SSkipListIterator **tsdbCreateTableIters(STsdbRepo *pRepo) {
STsdbCfg *pCfg = &(pRepo->config);
SSkipListIterator **iters = (SSkipListIterator **)calloc(pCfg->maxTables, sizeof(SSkipListIterator *));
if (iters == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
for (int tid = 1; tid < maxTables; tid++) {
STable *pTable = pMeta->tables[tid];
if (pTable == NULL || pTable->imem == NULL || pTable->imem->numOfRows == 0) continue;
iters[tid] = tSkipListCreateIter(pTable->imem->pData);
if (iters[tid] == NULL) goto _err;
if (!tSkipListIterNext(iters[tid])) goto _err;
}
return iters;
_err:
tsdbDestroyTableIters(iters, maxTables);
return NULL;
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper,
SDataCols *pDataCols) {
char dataDir[128] = {0};
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
STsdbCfg * pCfg = &pRepo->config;
SFileGroup *pGroup = NULL;
TSKEY minKey = 0, maxKey = 0;
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// Check if there are data to commit to this file
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
if (!hasDataToCommit) return 0; // No data to commit, just return
// Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir);
if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) {
tsdbError("vgId:%d, failed to create file group %d", pRepo->config.tsdbId, fid);
goto _err;
}
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d, failed to set helper file", pRepo->config.tsdbId);
goto _err;
}
// Loop to commit data in each table
for (int tid = 1; tid < pCfg->maxTables; tid++) {
STable *pTable = pMeta->tables[tid];
if (pTable == NULL) continue;
SSkipListIterator *pIter = iters[tid];
// Set the helper and the buffer dataCols object to help to write this table
tsdbSetHelperTable(pHelper, pTable, pRepo);
tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable));
// Loop to write the data in the cache to files. If no data to write, just break the loop
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
int nLoop = 0;
while (true) {
int rowsRead = tsdbReadRowsFromCache(pMeta, pTable, pIter, maxKey, maxRowsToRead, pDataCols);
assert(rowsRead >= 0);
if (pDataCols->numOfRows == 0) break;
nLoop++;
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey);
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
ASSERT(rowsWritten != 0);
if (rowsWritten < 0) goto _err;
ASSERT(rowsWritten <= pDataCols->numOfRows);
tdPopDataColsPoints(pDataCols, rowsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows;
}
ASSERT(pDataCols->numOfRows == 0);
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block", pRepo->config.tsdbId);
goto _err;
}
// Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part", pRepo->config.tsdbId);
goto _err;
}
}
if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compIdx part", pRepo->config.tsdbId);
goto _err;
}
tsdbCloseHelperFile(pHelper, 0);
// TODO: make it atomic with some methods
pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF;
pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF;
pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF;
return 0;
_err:
ASSERT(false);
tsdbCloseHelperFile(pHelper, 1);
return -1;
}
static char *getTSTupleKey(const void *data) {
SDataRow row = (SDataRow)data;
return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE);
}
#if 0
**
* Set the default TSDB configuration
*/
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
STsdbMeta * pMeta = pRepo->tsdbMeta;
......@@ -1057,11 +763,12 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
SRWHelper rhelper = {{0}};
if (tsdbInitReadHelper(&rhelper, pRepo) < 0) goto _err;
tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_ASC);
tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC);
while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) {
if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err;
for (int i = 1; i < pRepo->config.maxTables; i++) {
STable * pTable = pMeta->tables[i];
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
SCompIdx *pIdx = &rhelper.pCompIdx[i];
......@@ -1077,93 +784,7 @@ _err:
return -1;
}
/**
* Change the configuration of a repository
* @param pCfg the repository configuration, the upper layer should free the pointer
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbCfg * pRCfg = &pRepo->config;
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return TSDB_CODE_TDB_INVALID_CONFIG;
ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile);
ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock);
ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock);
ASSERT(pRCfg->precision == pCfg->precision);
bool configChanged = false;
if (pRCfg->compression != pCfg->compression) {
configChanged = true;
tsdbAlterCompression(pRepo, pCfg->compression);
}
if (pRCfg->keep != pCfg->keep) {
configChanged = true;
tsdbAlterKeep(pRepo, pCfg->keep);
}
if (pRCfg->totalBlocks != pCfg->totalBlocks) {
configChanged = true;
tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
}
if (pRCfg->maxTables != pCfg->maxTables) {
configChanged = true;
tsdbAlterMaxTables(pRepo, pCfg->maxTables);
}
if (configChanged) tsdbSaveConfig(pRepo);
return TSDB_CODE_SUCCESS;
}
/**
* Get the TSDB repository information, including some statistics
* @param pRepo the TSDB repository handle
* @param error the error number to set when failure occurs
*
* @return a info struct handle on success, NULL for failure and the error number is set. The upper
* layers should free the info handle themselves or memory leak will occur
*/
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) {
// TODO
return NULL;
}
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, uid);
if (pTable == NULL) return -1;
return TSDB_GET_TABLE_LAST_KEY(pTable);
}
STableInfo *tsdbGetTableInfo(TSDB_REPO_T *pRepo, STableId tableId) {
// TODO
return NULL;
}
// TODO: need to return the number of data inserted
void tsdbClearTableCfg(STableCfg *config) {
if (config) {
if (config->schema) tdFreeSchema(config->schema);
if (config->tagSchema) tdFreeSchema(config->tagSchema);
if (config->tagValues) kvRowFree(config->tagValues);
tfree(config->name);
tfree(config->sname);
tfree(config->sql);
free(config);
}
}
int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->len <= 0) return -1;
pIter->totalLen = pBlock->len;
pIter->len = 0;
......@@ -1171,125 +792,58 @@ int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
return 0;
}
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo) {
STsdbRepo *tsdb = (STsdbRepo *)pRepo;
return tsdb->tsdbMeta;
}
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo) {
STsdbRepo* tsdb = (STsdbRepo*) pRepo;
return tsdb->tsdbFileH;
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) {
int8_t oldCompRession = pRepo->config.compression;
pRepo->config.compression = compression;
tsdbTrace("vgId:%d tsdb compression is changed from %d to %d", oldCompRession, compression);
}
// Check the configuration and set default options
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg) {
char fname[128] = "\0";
if (tsdbGetCfgFname(pRepo, fname) < 0) return -1;
static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
STsdbCfg *pCfg = &pRepo->config;
int oldKeep = pCfg->keep;
int fd = open(fname, O_RDONLY);
if (fd < 0) {
return -1;
int maxFiles = keep / pCfg->maxTables + 3;
if (pRepo->config.keep > keep) {
pRepo->config.keep = keep;
pRepo->tsdbFileH->maxFGroups = maxFiles;
} else {
pRepo->config.keep = keep;
pRepo->tsdbFileH->fGroup = realloc(pRepo->tsdbFileH->fGroup, sizeof(SFileGroup));
if (pRepo->tsdbFileH->fGroup == NULL) {
// TODO: deal with the error
}
if (read(fd, (void *)pCfg, sizeof(STsdbCfg)) < sizeof(STsdbCfg)) {
close(fd);
return -1;
pRepo->tsdbFileH->maxFGroups = maxFiles;
}
close(fd);
return 0;
tsdbTrace("vgId:%d, keep is changed from %d to %d", pRepo->config.tsdbId, oldKeep, keep);
}
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) {
char fname[260];
if (pRepo == NULL) return 0;
char *dirName = calloc(1, strlen(pRepo->rootDir) + strlen("tsdb") + 2);
if (dirName == NULL) {
return -1;
}
sprintf(dirName, "%s/%s", pRepo->rootDir, "tsdb");
DIR *dir = opendir(dirName);
if (dir == NULL) return -1;
struct dirent *dp;
while ((dp = readdir(dir)) != NULL) {
if ((strcmp(dp->d_name, ".") == 0) || (strcmp(dp->d_name, "..") == 0)) continue;
sprintf(fname, "%s/%s", pRepo->rootDir, dp->d_name);
remove(fname);
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
int oldMaxTables = pRepo->config.maxTables;
if (oldMaxTables < pRepo->config.maxTables) {
// TODO
}
closedir(dir);
rmdir(dirName);
return 0;
}
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
ASSERT(maxRowsToRead > 0);
if (pIter == NULL) return 0;
STSchema *pSchema = NULL;
int numOfRows = 0;
do {
if (numOfRows >= maxRowsToRead) break;
SSkipListNode *node = tSkipListIterGet(pIter);
if (node == NULL) break;
SDataRow row = SL_GET_NODE_DATA(node);
if (dataRowKey(row) > maxKey) break;
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row));
if (pSchema == NULL) {
// TODO: deal with the error here
ASSERT(false);
}
}
STsdbMeta *pMeta = pRepo->tsdbMeta;
tdAppendDataRowToDataCol(row, pSchema, pCols);
numOfRows++;
} while (tSkipListIterNext(pIter));
pMeta->maxTables = maxTables;
pMeta->tables = realloc(pMeta->tables, maxTables * sizeof(STable *));
memset(&pMeta->tables[oldMaxTables], 0, sizeof(STable *) * (maxTables - oldMaxTables));
pRepo->config.maxTables = maxTables;
return numOfRows;
tsdbTrace("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables);
}
static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) {
if (iters == NULL) return;
for (int tid = 1; tid < maxTables; tid++) {
if (iters[tid] == NULL) continue;
tSkipListDestroyIter(iters[tid]);
}
#if 0
free(iters);
}
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, uid);
if (pTable == NULL) return -1;
static void tsdbFreeMemTable(SMemTable *pMemTable) {
if (pMemTable) {
tSkipListDestroy(pMemTable->pData);
free(pMemTable);
}
return TSDB_GET_TABLE_LAST_KEY(pTable);
}
// Commit to file
/**
* Return the next iterator key.
......@@ -1317,54 +871,8 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK
return 0;
}
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) {
int8_t oldCompRession = pRepo->config.compression;
pRepo->config.compression = compression;
tsdbTrace("vgId:%d, tsdb compression is changed from %d to %d", oldCompRession, compression);
}
static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
STsdbCfg *pCfg = &pRepo->config;
int oldKeep = pCfg->keep;
int maxFiles = keep / pCfg->maxTables + 3;
if (pRepo->config.keep > keep) {
pRepo->config.keep = keep;
pRepo->tsdbFileH->maxFGroups = maxFiles;
} else {
pRepo->config.keep = keep;
pRepo->tsdbFileH->fGroup = realloc(pRepo->tsdbFileH->fGroup, sizeof(SFileGroup));
if (pRepo->tsdbFileH->fGroup == NULL) {
// TODO: deal with the error
}
pRepo->tsdbFileH->maxFGroups = maxFiles;
}
tsdbTrace("vgId:%d, keep is changed from %d to %d", pRepo->config.tsdbId, oldKeep, keep);
}
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
int oldMaxTables = pRepo->config.maxTables;
if (oldMaxTables < pRepo->config.maxTables) {
// TODO
}
STsdbMeta *pMeta = pRepo->tsdbMeta;
pMeta->maxTables = maxTables;
pMeta->tables = realloc(pMeta->tables, maxTables * sizeof(STable *));
memset(&pMeta->tables[oldMaxTables], 0, sizeof(STable *) * (maxTables-oldMaxTables));
pRepo->config.maxTables = maxTables;
tsdbTrace("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables);
}
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage){
ASSERT(repo != NULL);
STsdbRepo * pRepo = repo;
*totalPoints = pRepo->stat.pointsWritten;
*totalStorage = pRepo->stat.totalStorage;
*compStorage = pRepo->stat.compStorage;
}
#endif
\ No newline at end of file
......@@ -318,3 +318,244 @@ static void tsdbFreeTableData(STableData *pTableData) {
}
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); }
static void *tsdbCommitData(void *arg) {
STsdbRepo *pRepo = (STsdbRepo *)arg;
STsdbMeta *pMeta = pRepo->tsdbMeta;
ASSERT(pRepo->imem != NULL);
ASSERT(pRepo->commit == 1);
tsdbPrint("vgId:%d start to commit! keyFirst " PRId64 " keyLast " PRId64 " numOfRows " PRId64, REPO_ID(pRepo),
pRepo->imem->keyFirst, pRepo->imem->keyLast, pRepo->imem->numOfRows);
// STsdbMeta * pMeta = pRepo->tsdbMeta;
// STsdbCache *pCache = pRepo->tsdbCache;
// STsdbCfg * pCfg = &(pRepo->config);
// SDataCols * pDataCols = NULL;
// SRWHelper whelper = {{0}};
// if (pCache->imem == NULL) return NULL;
tsdbPrint("vgId:%d, starting to commit....", pRepo->config.tsdbId);
// Create the iterator to read from cache
SSkipListIterator **iters = tsdbCreateTableIters(pRepo);
if (iters == NULL) {
tsdbError("vgId:%d failed to create table iterators since %s", REPO_ID(pRepo), tstrerror(terrno));
// TODO: deal with the error here
return NULL;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
// TODO
goto _exit;
}
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d since %s", REPO_ID(pRepo),
pMeta->maxRowBytes, pMeta->maxCols, tstrerror(terrno));
// TODO
goto _exit;
}
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
ASSERT(false);
goto _exit;
}
}
// Do retention actions
tsdbFitRetention(pRepo);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyTableIters(iters, pCfg->maxTables);
tsdbDestroyHelper(&whelper);
tsdbLockRepo(arg);
tdListMove(pCache->imem->list, pCache->pool.memPool);
tsdbAdjustCacheBlocks(pCache);
tdListFree(pCache->imem->list);
free(pCache->imem);
pCache->imem = NULL;
pRepo->commit = 0;
for (int i = 1; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->imem) {
tsdbFreeMemTable(pTable->imem);
pTable->imem = NULL;
}
}
tsdbUnLockRepo(arg);
tsdbPrint("vgId:%d, commit over....", pRepo->config.tsdbId);
return NULL;
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper,
SDataCols *pDataCols) {
char dataDir[128] = {0};
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
STsdbCfg * pCfg = &pRepo->config;
SFileGroup *pGroup = NULL;
TSKEY minKey = 0, maxKey = 0;
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// Check if there are data to commit to this file
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
if (!hasDataToCommit) return 0; // No data to commit, just return
// Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir);
if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) {
tsdbError("vgId:%d, failed to create file group %d", pRepo->config.tsdbId, fid);
goto _err;
}
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d, failed to set helper file", pRepo->config.tsdbId);
goto _err;
}
// Loop to commit data in each table
for (int tid = 1; tid < pCfg->maxTables; tid++) {
STable *pTable = pMeta->tables[tid];
if (pTable == NULL) continue;
SSkipListIterator *pIter = iters[tid];
// Set the helper and the buffer dataCols object to help to write this table
tsdbSetHelperTable(pHelper, pTable, pRepo);
tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable));
// Loop to write the data in the cache to files. If no data to write, just break the loop
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
int nLoop = 0;
while (true) {
int rowsRead = tsdbReadRowsFromCache(pMeta, pTable, pIter, maxKey, maxRowsToRead, pDataCols);
assert(rowsRead >= 0);
if (pDataCols->numOfRows == 0) break;
nLoop++;
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey);
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
ASSERT(rowsWritten != 0);
if (rowsWritten < 0) goto _err;
ASSERT(rowsWritten <= pDataCols->numOfRows);
tdPopDataColsPoints(pDataCols, rowsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows;
}
ASSERT(pDataCols->numOfRows == 0);
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block", pRepo->config.tsdbId);
goto _err;
}
// Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part", pRepo->config.tsdbId);
goto _err;
}
}
if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compIdx part", pRepo->config.tsdbId);
goto _err;
}
tsdbCloseHelperFile(pHelper, 0);
// TODO: make it atomic with some methods
pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF;
pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF;
pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF;
return 0;
_err:
ASSERT(false);
tsdbCloseHelperFile(pHelper, 1);
return -1;
}
static SSkipListIterator **tsdbCreateTableIters(STsdbRepo *pRepo) {
STsdbCfg *pCfg = &(pRepo->config);
SSkipListIterator **iters = (SSkipListIterator **)calloc(pCfg->maxTables, sizeof(SSkipListIterator *));
if (iters == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
for (int tid = 1; tid < maxTables; tid++) {
STable *pTable = pMeta->tables[tid];
if (pTable == NULL || pTable->imem == NULL || pTable->imem->numOfRows == 0) continue;
iters[tid] = tSkipListCreateIter(pTable->imem->pData);
if (iters[tid] == NULL) goto _err;
if (!tSkipListIterNext(iters[tid])) goto _err;
}
return iters;
_err:
tsdbDestroyTableIters(iters, maxTables);
return NULL;
}
static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) {
if (iters == NULL) return;
for (int tid = 1; tid < maxTables; tid++) {
if (iters[tid] == NULL) continue;
tSkipListDestroyIter(iters[tid]);
}
free(iters);
}
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
ASSERT(maxRowsToRead > 0);
if (pIter == NULL) return 0;
STSchema *pSchema = NULL;
int numOfRows = 0;
do {
if (numOfRows >= maxRowsToRead) break;
SSkipListNode *node = tSkipListIterGet(pIter);
if (node == NULL) break;
SDataRow row = SL_GET_NODE_DATA(node);
if (dataRowKey(row) > maxKey) break;
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row));
if (pSchema == NULL) {
// TODO: deal with the error here
ASSERT(false);
}
}
tdAppendDataRowToDataCol(row, pSchema, pCols);
numOfRows++;
} while (tSkipListIterNext(pIter));
return numOfRows;
}
\ No newline at end of file
......@@ -42,6 +42,7 @@ static int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
static int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid);
static int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup);
static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
static void tsdbClearTableCfg(STableCfg *config);
static void * tsdbEncodeTableName(void *buf, tstr *name);
static void * tsdbDecodeTableName(void *buf, tstr **name);
static void * tsdbEncodeTable(void *buf, STable *pTable);
......@@ -243,6 +244,67 @@ _err:
return NULL;
}
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
int16_t tversion = htons(pMsg->tversion);
STable *pTable = tsdbGetTableByUid(pMeta, htobe64(pMsg->uid));
if (pTable == NULL) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TID(pTable) != htonl(pMsg->tid)) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID
return -1;
}
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
tsdbError("vgId:%d failed to update tag value of table %s since its type is %d", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_TYPE(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
if (schemaVersion(tsdbGetTableTagSchema(pTable)) < tversion) {
tsdbTrace("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo),
schemaVersion(tsdbGetTableTagSchema(pTable)), tversion);
void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, htonl(pMsg->tid));
if (msg == NULL) return -1;
// Deal with error her
STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg);
STable * super = tsdbGetTableByUid(pMeta, pTableCfg->superUid);
ASSERT(super != NULL);
int32_t code = tsdbUpdateTable(pMeta, super, pTableCfg);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbClearTableCfg(pTableCfg);
rpcFreeCont(msg);
}
STSchema *pTagSchema = tsdbGetTableTagSchema(pTable);
if (schemaVersion(pTagSchema) > tversion) {
tsdbError(
"vgId:%d failed to update tag value of table %s since version out of date, client tag version:%d server tag "
"version:%d",
pRepo->config.tsdbId, varDataVal(pTable->name), tversion, schemaVersion(pTable->tagSchema));
return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
}
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) {
tsdbRemoveTableFromIndex(pMeta, pTable);
}
// TODO: remove table from index if it is the first column of tag
tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data);
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) {
tsdbAddTableIntoIndex(pMeta, pTable);
}
return TSDB_CODE_SUCCESS;
}
// ------------------ INTERNAL FUNCTIONS ------------------
STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
STsdbMeta *pMeta = (STsdbMeta *)calloc(1, sizeof(*pMeta));
......@@ -896,6 +958,18 @@ static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) {
return 0;
}
static void tsdbClearTableCfg(STableCfg *config) {
if (config) {
if (config->schema) tdFreeSchema(config->schema);
if (config->tagSchema) tdFreeSchema(config->tagSchema);
if (config->tagValues) kvRowFree(config->tagValues);
tfree(config->name);
tfree(config->sname);
tfree(config->sql);
free(config);
}
}
static void *tsdbEncodeTableName(void *buf, tstr *name) {
void *pBuf = buf;
......
......@@ -87,7 +87,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
}
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
int32_t code = 0;
int32_t code = TSDB_CODE_SUCCESS;
// save insert result into item
......@@ -96,7 +96,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len);
SShellSubmitRspMsg *pRsp = pRet->rsp;
code = tsdbInsertData(pVnode->tsdb, pCont, pRsp);
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
pRsp->numOfFailedBlocks = 0; //TODO
//pRet->len += pRsp->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); //TODO
pRsp->code = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册