提交 b51c6ae1 编写于 作者: H Hongze Cheng

refact

上级 63a68154
...@@ -58,15 +58,6 @@ typedef struct SDataStatis { ...@@ -58,15 +58,6 @@ typedef struct SDataStatis {
} SDataStatis; } SDataStatis;
// --------- TSDB APPLICATION HANDLE DEFINITION // --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct {
void *appH;
void *cqH;
int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema,
int start);
void (*cqDropFunc)(void *handle);
} STsdbAppH;
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION // --------- TSDB REPOSITORY CONFIGURATION DEFINITION
typedef struct { typedef struct {
......
...@@ -16,22 +16,21 @@ ...@@ -16,22 +16,21 @@
#ifndef _TD_TSDB_INT_H_ #ifndef _TD_TSDB_INT_H_
#define _TD_TSDB_INT_H_ #define _TD_TSDB_INT_H_
#include "os.h" #include "os.h"
#include "tlog.h"
#include "taosdef.h" #include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "tarray.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tskiplist.h"
#include "tdataformat.h"
#include "tcoding.h" #include "tcoding.h"
#include "tcompression.h" #include "tcompression.h"
#include "tlockfree.h" #include "tdataformat.h"
#include "tlist.h"
#include "thash.h"
#include "tarray.h"
#include "tfs.h" #include "tfs.h"
#include "thash.h"
#include "tlist.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tsdbMemory.h" #include "tsdbMemory.h"
#include "tskiplist.h"
#include "tsdb.h" #include "tsdb.h"
...@@ -62,31 +61,14 @@ extern "C" { ...@@ -62,31 +61,14 @@ extern "C" {
// Main definitions // Main definitions
struct STsdb { struct STsdb {
uint8_t state; uint8_t state;
STsdbCfg config; STsdbCfg config;
STsdbCfg save_config; // save apply config
bool config_changed; // config changed flag
pthread_mutex_t save_mutex; // protect save config
int16_t cacheLastConfigVersion;
STsdbAppH appH;
STsdbStat stat; STsdbStat stat;
STsdbMeta* tsdbMeta; STsdbMeta* tsdbMeta;
// STsdbBufPool* pPool;
SMemTable* mem; SMemTable* mem;
SMemTable* imem; SMemTable* imem;
STsdbFS* fs; STsdbFS* fs;
SRtn rtn; SRtn rtn;
tsem_t readyToCommit; SMergeBuf mergeBuf; // used when update=2
pthread_mutex_t mutex;
bool repoLocked;
int32_t code; // Commit code
SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
pthread_t* pthread;
}; };
#define REPO_ID(r) (r)->config.tsdbId #define REPO_ID(r) (r)->config.tsdbId
...@@ -100,36 +82,11 @@ int tsdbUnlockRepo(STsdb* pRepo); ...@@ -100,36 +82,11 @@ int tsdbUnlockRepo(STsdb* pRepo);
STsdbMeta* tsdbGetMeta(STsdb* pRepo); STsdbMeta* tsdbGetMeta(STsdb* pRepo);
int tsdbCheckCommit(STsdb* pRepo); int tsdbCheckCommit(STsdb* pRepo);
int tsdbRestoreInfo(STsdb* pRepo); int tsdbRestoreInfo(STsdb* pRepo);
UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg); UNUSED_FUNC int tsdbCacheLastData(STsdb* pRepo, STsdbCfg* oldCfg);
int32_t tsdbLoadLastCache(STsdb *pRepo, STable* pTable); int32_t tsdbLoadLastCache(STsdb* pRepo, STable* pTable);
void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetRootDir(int repoid, char dirName[]);
void tsdbGetDataDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]);
// static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdb* pRepo) {
// ASSERT(pRepo != NULL);
// if (pRepo->mem == NULL) return NULL;
// SListNode* pNode = listTail(pRepo->mem->bufBlockList);
// if (pNode == NULL) return NULL;
// STsdbBufBlock* pBufBlock = NULL;
// tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock));
// return pBufBlock;
// }
// static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
// ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
// int maxTables = TSDB_INIT_NTABLES;
// while (true) {
// maxTables = MIN(maxTables, TSDB_MAX_TABLES);
// if (tid <= maxTables) break;
// maxTables *= 2;
// }
// return maxTables + 1;
// }
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
// no test file errors here // no test file errors here
#include "taosdef.h" #include "taosdef.h"
#include "tsdbint.h" #include "tsdbint.h"
#include "ttimer.h"
#include "tthread.h" #include "tthread.h"
#include "ttimer.h"
#define IS_VALID_PRECISION(precision) \ #define IS_VALID_PRECISION(precision) \
(((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO))
...@@ -25,46 +25,15 @@ ...@@ -25,46 +25,15 @@
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP)) #define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static STsdb *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); static STsdb * tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
static void tsdbFreeRepo(STsdb *pRepo); static void tsdbFreeRepo(STsdb *pRepo);
static void tsdbStartStream(STsdb *pRepo); static void tsdbStartStream(STsdb *pRepo);
static void tsdbStopStream(STsdb *pRepo); static void tsdbStopStream(STsdb *pRepo);
static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh); static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH *pReadh);
static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx); static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH *pReadh, SBlockIdx *pIdx);
// // Function declaration
// int32_t tsdbCreateRepo(int repoid) {
// char tsdbDir[TSDB_FILENAME_LEN] = "\0";
// char dataDir[TSDB_FILENAME_LEN] = "\0";
// tsdbGetRootDir(repoid, tsdbDir);
// if (tfsMkdir(tsdbDir) < 0) {
// goto _err;
// }
// tsdbGetDataDir(repoid, dataDir);
// if (tfsMkdir(dataDir) < 0) {
// goto _err;
// }
// // TODO: need to create current file with nothing in
// return 0;
// _err:
// tsdbError("vgId:%d failed to create TSDB repository since %s", repoid, tstrerror(terrno));
// return -1;
// }
// int32_t tsdbDropRepo(int repoid) { STsdb *tsdbOpen(const char *path, STsdbCfg *pCfg) {
// char tsdbDir[TSDB_FILENAME_LEN] = "\0"; STsdb * pTsdb;
// tsdbGetRootDir(repoid, tsdbDir);
// return tfsRmdir(tsdbDir);
// }
STsdb *tsdbOpen(STsdbCfg *pCfg, STsdbAppH *pAppH) {
STsdb *pRepo;
STsdbCfg config = *pCfg; STsdbCfg config = *pCfg;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
...@@ -76,39 +45,39 @@ STsdb *tsdbOpen(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -76,39 +45,39 @@ STsdb *tsdbOpen(STsdbCfg *pCfg, STsdbAppH *pAppH) {
} }
// Create new TSDB object // Create new TSDB object
if ((pRepo = tsdbNewRepo(&config, pAppH)) == NULL) { if ((pTsdb = tsdbNewRepo(&config, pAppH)) == NULL) {
tsdbError("vgId:%d failed to open TSDB repository while creating TSDB object since %s", config.tsdbId, tsdbError("vgId:%d failed to open TSDB repository while creating TSDB object since %s", config.tsdbId,
tstrerror(terrno)); tstrerror(terrno));
return NULL; return NULL;
} }
// Open meta // Open meta
if (tsdbOpenMeta(pRepo) < 0) { if (tsdbOpenMeta(pTsdb) < 0) {
tsdbError("vgId:%d failed to open TSDB repository while opening Meta since %s", config.tsdbId, tstrerror(terrno)); tsdbError("vgId:%d failed to open TSDB repository while opening Meta since %s", config.tsdbId, tstrerror(terrno));
tsdbClose(pRepo, false); tsdbClose(pTsdb, false);
return NULL; return NULL;
} }
if (tsdbOpenFS(pRepo) < 0) { if (tsdbOpenFS(pTsdb) < 0) {
tsdbError("vgId:%d failed to open TSDB repository while opening FS since %s", config.tsdbId, tstrerror(terrno)); tsdbError("vgId:%d failed to open TSDB repository while opening FS since %s", config.tsdbId, tstrerror(terrno));
tsdbClose(pRepo, false); tsdbClose(pTsdb, false);
return NULL; return NULL;
} }
// TODO: Restore information from data // TODO: Restore information from data
if ((!(pRepo->state & TSDB_STATE_BAD_DATA)) && tsdbRestoreInfo(pRepo) < 0) { if ((!(pTsdb->state & TSDB_STATE_BAD_DATA)) && tsdbRestoreInfo(pTsdb) < 0) {
tsdbError("vgId:%d failed to open TSDB repository while restore info since %s", config.tsdbId, tstrerror(terrno)); tsdbError("vgId:%d failed to open TSDB repository while restore info since %s", config.tsdbId, tstrerror(terrno));
tsdbClose(pRepo, false); tsdbClose(pTsdb, false);
return NULL; return NULL;
} }
pRepo->mergeBuf = NULL; pTsdb->mergeBuf = NULL;
tsdbStartStream(pRepo); tsdbStartStream(pTsdb);
tsdbDebug("vgId:%d, TSDB repository opened", REPO_ID(pRepo)); tsdbDebug("vgId:%d, TSDB repository opened", REPO_ID(pTsdb));
return pRepo; return pTsdb;
} }
// Note: all working thread and query thread must stopped when calling this function // Note: all working thread and query thread must stopped when calling this function
...@@ -121,7 +90,7 @@ int tsdbClose(STsdb *repo, int toCommit) { ...@@ -121,7 +90,7 @@ int tsdbClose(STsdb *repo, int toCommit) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
tsdbStopStream(pRepo); tsdbStopStream(pRepo);
if(pRepo->pthread){ if (pRepo->pthread) {
taosDestoryThread(pRepo->pthread); taosDestoryThread(pRepo->pthread);
pRepo->pthread = NULL; pRepo->pthread = NULL;
} }
...@@ -192,7 +161,8 @@ int tsdbUnlockRepo(STsdb *pRepo) { ...@@ -192,7 +161,8 @@ int tsdbUnlockRepo(STsdb *pRepo) {
// STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); // STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
// ASSERT(pBufBlock != NULL); // ASSERT(pBufBlock != NULL);
// if ((pRepo->mem->extraBuffList != NULL) || // if ((pRepo->mem->extraBuffList != NULL) ||
// ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { // ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE)))
// {
// // trigger commit // // trigger commit
// if (tsdbAsyncCommit(pRepo) < 0) return -1; // if (tsdbAsyncCommit(pRepo) < 0) return -1;
// } // }
...@@ -219,7 +189,7 @@ int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) { ...@@ -219,7 +189,7 @@ int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) {
// TODO: think about multithread cases // TODO: think about multithread cases
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1; if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1;
STsdbCfg * pRCfg = &repo->config; STsdbCfg *pRCfg = &repo->config;
ASSERT(pRCfg->tsdbId == pCfg->tsdbId); ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize); ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
...@@ -259,7 +229,7 @@ int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) { ...@@ -259,7 +229,7 @@ int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) {
return -1; return -1;
} }
STsdbCfg * pSaveCfg = &repo->save_config; STsdbCfg *pSaveCfg = &repo->save_config;
*pSaveCfg = repo->config; *pSaveCfg = repo->config;
pSaveCfg->compression = pCfg->compression; pSaveCfg->compression = pCfg->compression;
...@@ -269,14 +239,11 @@ int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) { ...@@ -269,14 +239,11 @@ int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) {
pSaveCfg->cacheLastRow = pCfg->cacheLastRow; pSaveCfg->cacheLastRow = pCfg->cacheLastRow;
pSaveCfg->totalBlocks = pCfg->totalBlocks; pSaveCfg->totalBlocks = pCfg->totalBlocks;
tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", REPO_ID(repo),
REPO_ID(repo), pRCfg->compression, pRCfg->keep, pRCfg->keep1, pRCfg->keep2, pRCfg->cacheLastRow, pRCfg->totalBlocks);
pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2, tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", REPO_ID(repo),
pRCfg->cacheLastRow, pRCfg->totalBlocks); pSaveCfg->compression, pSaveCfg->keep, pSaveCfg->keep1, pSaveCfg->keep2, pSaveCfg->cacheLastRow,
tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", pSaveCfg->totalBlocks);
REPO_ID(repo),
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
pSaveCfg->cacheLastRow,pSaveCfg->totalBlocks);
repo->config_changed = true; repo->config_changed = true;
...@@ -335,91 +302,7 @@ int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) { ...@@ -335,91 +302,7 @@ int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) {
#endif #endif
} }
uint32_t tsdbGetFileInfo(STsdb *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { void tsdbGetRootDir(int repoid, char dirName[]) { snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid); }
// TODO
return 0;
#if 0
STsdbRepo *pRepo = (STsdbRepo *)repo;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
uint32_t magic = 0;
char * fname = NULL;
struct stat fState;
tsdbDebug("vgId:%d name:%s index:%d eindex:%d", pRepo->config.tsdbId, name, *index, eindex);
ASSERT(*index <= eindex);
if (name[0] == 0) { // get the file from index or after, but not larger than eindex
int fid = (*index) / TSDB_FILE_TYPE_MAX;
if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) {
if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) {
fname = tsdbGetMetaFileName(pRepo->rootDir);
*index = TSDB_META_FILE_INDEX;
magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta);
sprintf(name, "tsdb/%s", TSDB_META_FILE_NAME);
} else {
return 0;
}
} else {
SFileGroup *pFGroup =
taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE);
if (pFGroup->fileId == fid) {
SFile *pFile = &pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX];
fname = strdup(TSDB_FILE_NAME(pFile));
magic = pFile->info.magic;
char *tfname = strdup(fname);
sprintf(name, "tsdb/%s/%s", TSDB_DATA_DIR_NAME, basename(tfname));
tfree(tfname);
} else {
if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < (int)eindex) {
SFile *pFile = &pFGroup->files[0];
fname = strdup(TSDB_FILE_NAME(pFile));
*index = pFGroup->fileId * TSDB_FILE_TYPE_MAX;
magic = pFile->info.magic;
char *tfname = strdup(fname);
sprintf(name, "tsdb/%s/%s", TSDB_DATA_DIR_NAME, basename(tfname));
tfree(tfname);
} else {
return 0;
}
}
}
} else { // get the named file at the specified index. If not there, return 0
fname = malloc(256);
sprintf(fname, "%s/vnode/vnode%d/%s", TFS_PRIMARY_PATH(), REPO_ID(pRepo), name);
if (access(fname, F_OK) != 0) {
tfree(fname);
return 0;
}
if (*index == TSDB_META_FILE_INDEX) { // get meta file
tsdbGetStoreInfo(fname, &magic, size);
} else {
char tfname[TSDB_FILENAME_LEN] = "\0";
sprintf(tfname, "vnode/vnode%d/tsdb/%s/%s", REPO_ID(pRepo), TSDB_DATA_DIR_NAME, basename(name));
tsdbGetFileInfoImpl(tfname, &magic, size);
}
tfree(fname);
return magic;
}
if (stat(fname, &fState) < 0) {
tfree(fname);
return 0;
}
*size = fState.st_size;
// magic = *size;
tfree(fname);
return magic;
#endif
}
void tsdbGetRootDir(int repoid, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid);
}
void tsdbGetDataDir(int repoid, char dirName[]) { void tsdbGetDataDir(int repoid, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", repoid); snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", repoid);
...@@ -550,8 +433,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -550,8 +433,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// update cacheLastRow // update cacheLastRow
if (pCfg->cacheLastRow != 0) { if (pCfg->cacheLastRow != 0) {
if (pCfg->cacheLastRow > 3) if (pCfg->cacheLastRow > 3) pCfg->cacheLastRow = 1;
pCfg->cacheLastRow = 1;
} }
return 0; return 0;
} }
...@@ -633,8 +515,9 @@ static void tsdbStartStream(STsdb *pRepo) { ...@@ -633,8 +515,9 @@ static void tsdbStartStream(STsdb *pRepo) {
for (int i = 0; i < pMeta->maxTables; i++) { for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) { if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql, pTable->cqhandle =
tsdbGetTableSchemaImpl(pTable, false, false, -1, -1), 0); (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data,
pTable->sql, tsdbGetTableSchemaImpl(pTable, false, false, -1, -1), 0);
} }
} }
} }
...@@ -650,8 +533,8 @@ static void tsdbStopStream(STsdb *pRepo) { ...@@ -650,8 +533,8 @@ static void tsdbStopStream(STsdb *pRepo) {
} }
} }
static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) { static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH *pReadh) {
//tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data); // tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data);
STSchema *pSchema = tsdbGetTableLatestSchema(pTable); STSchema *pSchema = tsdbGetTableLatestSchema(pTable);
if (pSchema == NULL) { if (pSchema == NULL) {
...@@ -659,10 +542,10 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) ...@@ -659,10 +542,10 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh)
return 0; return 0;
} }
SBlock* pBlock; SBlock * pBlock;
int numColumns; int numColumns;
int32_t blockIdx; int32_t blockIdx;
SDataStatis* pBlockStatis = NULL; SDataStatis *pBlockStatis = NULL;
// SMemRow row = NULL; // SMemRow row = NULL;
// restore last column data with last schema // restore last column data with last schema
...@@ -702,7 +585,7 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) ...@@ -702,7 +585,7 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh)
goto out; goto out;
} }
memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis)); memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis));
for(int32_t i = 0; i < numColumns; ++i) { for (int32_t i = 0; i < numColumns; ++i) {
STColumn *pCol = schemaColAt(pSchema, i); STColumn *pCol = schemaColAt(pSchema, i);
pBlockStatis[i].colId = pCol->colId; pBlockStatis[i].colId = pCol->colId;
} }
...@@ -744,8 +627,8 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) ...@@ -744,8 +627,8 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh)
// OK,let's load row from backward to get not-null column // OK,let's load row from backward to get not-null column
for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) {
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; SDataCol * pDataCol = pReadh->pDCols[0]->cols + i;
const void* pColData = tdGetColDataOfRow(pDataCol, rowId); const void *pColData = tdGetColDataOfRow(pDataCol, rowId);
// tdAppendColVal(memRowDataBody(row), pColData, pCol->type, pCol->offset); // tdAppendColVal(memRowDataBody(row), pColData, pCol->type, pCol->offset);
// SDataCol *pDataCol = readh.pDCols[0]->cols + j; // SDataCol *pDataCol = readh.pDCols[0]->cols + j;
// void *value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + // void *value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE +
...@@ -757,7 +640,8 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) ...@@ -757,7 +640,8 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh)
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId); int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId);
if (idx == -1) { if (idx == -1) {
tsdbError("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId); tsdbError("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo),
pTable->name->data, pCol->colId);
continue; continue;
} }
// save not-null column // save not-null column
...@@ -777,7 +661,8 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) ...@@ -777,7 +661,8 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh)
pTable->restoreColumnNum += 1; pTable->restoreColumnNum += 1;
tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo),
pTable->name->data, pLastCol->colId, pLastCol->ts);
break; break;
} }
} }
...@@ -795,13 +680,13 @@ out: ...@@ -795,13 +680,13 @@ out:
return err; return err;
} }
static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH *pReadh, SBlockIdx *pIdx) {
ASSERT(pTable->lastRow == NULL); ASSERT(pTable->lastRow == NULL);
if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) { if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) {
return -1; return -1;
} }
SBlock* pBlock = pReadh->pBlkInfo->blocks + pIdx->numOfBlocks - 1; SBlock *pBlock = pReadh->pBlkInfo->blocks + pIdx->numOfBlocks - 1;
if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) { if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) {
return -1; return -1;
...@@ -877,7 +762,7 @@ int tsdbRestoreInfo(STsdb *pRepo) { ...@@ -877,7 +762,7 @@ int tsdbRestoreInfo(STsdb *pRepo) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue; if (pTable == NULL) continue;
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); // tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
if (tsdbSetReadTable(&readh, pTable) < 0) { if (tsdbSetReadTable(&readh, pTable) < 0) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
...@@ -924,7 +809,8 @@ int32_t tsdbLoadLastCache(STsdb *pRepo, STable *pTable) { ...@@ -924,7 +809,8 @@ int32_t tsdbLoadLastCache(STsdb *pRepo, STable *pTable) {
bool cacheLastRow = CACHE_LAST_ROW(&(pRepo->config)); bool cacheLastRow = CACHE_LAST_ROW(&(pRepo->config));
bool cacheLastCol = CACHE_LAST_NULL_COLUMN(&(pRepo->config)); bool cacheLastCol = CACHE_LAST_NULL_COLUMN(&(pRepo->config));
tsdbDebug("tsdbLoadLastCache for %s, cacheLastRow:%d, cacheLastCol:%d", pTable->name->data, cacheLastRow, cacheLastCol); tsdbDebug("tsdbLoadLastCache for %s, cacheLastRow:%d, cacheLastCol:%d", pTable->name->data, cacheLastRow,
cacheLastCol);
pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion; pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion;
...@@ -1005,7 +891,7 @@ int32_t tsdbLoadLastCache(STsdb *pRepo, STable *pTable) { ...@@ -1005,7 +891,7 @@ int32_t tsdbLoadLastCache(STsdb *pRepo, STable *pTable) {
return 0; return 0;
} }
UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) { UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg *oldCfg) {
bool cacheLastRow = false, cacheLastCol = false; bool cacheLastRow = false, cacheLastCol = false;
SFSIter fsiter; SFSIter fsiter;
SReadH readh; SReadH readh;
...@@ -1085,7 +971,7 @@ UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) { ...@@ -1085,7 +971,7 @@ UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue; if (pTable == NULL) continue;
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); // tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
if (tsdbSetReadTable(&readh, pTable) < 0) { if (tsdbSetReadTable(&readh, pTable) < 0) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册