未验证 提交 f77c3a69 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #1434 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
......@@ -105,11 +105,33 @@ SDataRow tdDataRowDup(SDataRow row);
// ----------------- Data column structure
typedef struct SDataCol {
int64_t len;
char data[];
int8_t type;
int16_t colId;
int bytes;
int len;
int offset;
void * pData;
} SDataCol;
void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter);
typedef struct {
int maxRowSize;
int maxCols; // max number of columns
int maxPoints; // max number of points
int numOfPoints;
int numOfCols; // Total number of cols
void * buf;
SDataCol cols[];
} SDataCols;
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsKeyFirst(pCols) ((int64_t *)(keyCol(pCols)->pData))[0]
#define dataColsKeyLast(pCols) ((int64_t *)(keyCol(pCols)->pData))[(pCols)->numOfPoints - 1]
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
void tdResetDataCols(SDataCols *pCols);
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
#ifdef __cplusplus
}
......
......@@ -294,14 +294,64 @@ SDataRow tdDataRowDup(SDataRow row) {
return trow;
}
void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter) {
int row = *iter;
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
if (pCols == NULL) return NULL;
pCols->maxRowSize = maxRowSize;
pCols->maxCols = maxCols;
pCols->maxPoints = maxRows;
pCols->buf = malloc(maxRowSize * maxRows);
if (pCols->buf == NULL) {
free(pCols);
return NULL;
}
return pCols;
}
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
// assert(schemaNCols(pSchema) <= pCols->numOfCols);
tdResetDataCols(pCols);
pCols->numOfCols = schemaNCols(pSchema);
pCols->cols[0].pData = pCols->buf;
for (int i = 0; i < schemaNCols(pSchema); i++) {
// TODO
if (i > 0) {
pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints;
}
pCols->cols[i].type = colType(schemaColAt(pSchema, i));
pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i));
pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i));
pCols->cols[i].colId = colColId(schemaColAt(pSchema, i));
}
return pCols;
}
void tdFreeDataCols(SDataCols *pCols) {
if (pCols) {
if (pCols->buf) free(pCols->buf);
free(pCols);
}
}
*iter = row + 1;
void tdResetDataCols(SDataCols *pCols) {
pCols->numOfPoints = 0;
for (int i = 0; i < pCols->maxCols; i++) {
pCols->cols[i].len = 0;
}
}
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
TSKEY key = dataRowKey(row);
for (int i = 0; i < pCols->numOfCols; i++) {
SDataCol *pCol = pCols->cols + i;
memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes);
pCol->len += pCol->bytes;
}
pCols->numOfPoints++;
}
/**
......
......@@ -176,6 +176,14 @@ uint32_t ip2uint(const char *const ip_addr);
void taosSetAllocMode(int mode, const char* path, bool autoDump);
void taosDumpMemoryLeak();
#define TD_EQ 0x1
#define TD_GT 0x2
#define TD_LT 0x4
#define TD_GE (TD_EQ | TD_GT)
#define TD_LE (TD_EQ | TD_LT)
void *taosbsearch(const void *key, const void *base, size_t nmemb, size_t size,
int (*compar)(const void *, const void *), int flags);
#ifdef TAOS_MEM_CHECK
void * taos_malloc(size_t size, const char *file, uint32_t line);
......
......@@ -617,3 +617,72 @@ char *taosCharsetReplace(char *charsetstr) {
return strdup(charsetstr);
}
#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx))
void * taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *), int flags) {
// TODO: need to check the correctness of this function
int l = 0;
int r = nmemb;
int idx = 0;
int comparison;
if (flags == TD_EQ) {
return bsearch(key, base, nmemb, size, compar);
} else if (flags == TD_GE) {
if ((*compar)(key, elePtrAt(base, size, 0)) <= 0) return elePtrAt(base, size, 0);
if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) > 0) return NULL;
while (l < r) {
idx = (l + r) / 2;
comparison = (*compar)(key, elePtrAt(base, size, idx));
if (comparison < 0) {
r = idx;
} else if (comparison > 0) {
l = idx + 1;
} else {
return elePtrAt(base, size, idx);
}
}
if ((*compar)(key, elePtrAt(base, size, idx) < 0)) {
return elePtrAt(base, size, idx);
} else {
if (idx + 1 > nmemb - 1) {
return NULL;
} else {
return elePtrAt(base, size, idx + 1);
}
}
} else if (flags == TD_LE) {
if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) >= 0) return elePtrAt(base, size, nmemb - 1);
if ((*compar)(key, elePtrAt(base, size, 0)) < 0) return NULL;
while (l < r) {
idx = (l + r) / 2;
comparison = (*compar)(key, elePtrAt(base, size, idx));
if (comparison < 0) {
r = idx;
} else if (comparison > 0) {
l = idx + 1;
} else {
return elePtrAt(base, size, idx);
}
}
if ((*compar)(key, elePtrAt(base, size, idx)) > 0) {
return elePtrAt(base, size, idx);
} else {
if (idx == 0) {
return NULL;
} else {
return elePtrAt(base, size, idx - 1);
}
}
} else {
assert(0);
return NULL;
}
return NULL;
}
......@@ -17,6 +17,7 @@
#include <stdint.h>
#include "dataformat.h"
#include "taosdef.h"
#include "tglobalcfg.h"
......@@ -69,20 +70,26 @@ typedef struct {
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles);
void tsdbCloseFileH(STsdbFileH *pFileH);
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
int tsdbOpenFile(SFile *pFile, int oflag);
SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
typedef struct {
int32_t len;
int32_t padding; // For padding purpose
int64_t offset;
} SCompIdx;
int32_t offset;
int32_t hasLast : 1;
int32_t numOfSuperBlocks : 31;
int32_t checksum;
TSKEY maxKey;
} SCompIdx; /* sizeof(SCompIdx) = 24 */
/**
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
* the data block offset and length
* if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the
* binary
* if numOfSubBlocks == 0, then the SCompBlock is a sub-block
* if numOfSubBlocks >= 1, then the SCompBlock is a super-block
* - if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
* the data block offset and length
* - if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the
* binary
*/
typedef struct {
int64_t last : 1; // If the block in data file or last file
......@@ -101,11 +108,12 @@ typedef struct {
int32_t delimiter; // For recovery usage
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
int64_t uid;
int32_t padding; // For padding purpose
int32_t numOfBlocks; // TODO: make the struct padding
SCompBlock blocks[];
} SCompInfo;
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables);
int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf);
// TODO: take pre-calculation into account
typedef struct {
int16_t colId; // Column ID
......@@ -122,6 +130,8 @@ typedef struct {
SCompCol cols[];
} SCompData;
int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols);
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
#ifdef __cplusplus
}
......
......@@ -39,6 +39,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname);
static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile);
static int tsdbWriteFileHead(SFile *pFile);
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid);
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles);
......@@ -70,9 +71,7 @@ int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables)
SFileGroup fGroup;
SFileGroup *pFGroup = &fGroup;
if (fid < TSDB_MIN_FILE_ID(pFileH) || fid > TSDB_MAX_FILE_ID(pFileH) ||
bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey) ==
NULL) {
if (tsdbSearchFGroup(pFileH, fid) == NULL) {
pFGroup->fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) {
......@@ -107,6 +106,86 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
return 0;
}
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) {
SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1;
// TODO: need to check the correctness
return 0;
}
int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) {
SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1;
if (read(pFile->fd, buf, pIdx->len) < 0) return -1;
// TODO: need to check the correctness
return 0;
}
static int tsdbWriteBlockToFileImpl(SFile * pFile, // File to write
SDataCols * pCols, // Data column buffer
int numOfPointsToWrie, // Number of points to write to the file
SCompBlock *pBlock // SCompBlock to hold block information to return
) {
// pBlock->last = 0;
// pBlock->offset = lseek(pFile->fd, 0, SEEK_END);
// // pBlock->algorithm = ;
// pBlock->numOfPoints = pCols->numOfPoints;
// // pBlock->sversion = ;
// // pBlock->len = ;
// pBlock->numOfSubBlocks = 1;
// pBlock->keyFirst = dataColsKeyFirst(pCols);
// pBlock->keyLast = dataColsKeyLast(pCols);
// for (int i = 0; i < pCols->numOfCols; i++) {
// // TODO: if all col value is NULL, do not save it
// pBlock->numOfCols++;
// pCompData->numOfCols++;
// SCompCol *pCompCol = pCompData->cols + i;
// pCompCol->colId = pCols->cols[i].colId;
// pCompCol->type = pCols->cols[i].type;
// // pCompCol->len = ;
// // pCompCol->offset = ;
// }
return 0;
}
int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols) {
memset((void *)pBlock, 0, sizeof(SCompBlock));
SFile *pFile = NULL;
SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols);
if (pCompData == NULL) return -1;
pCompData->delimiter = TSDB_FILE_DELIMITER;
// pCompData->uid = ;
if (isMerge) {
TSKEY keyFirst = dataColsKeyFirst(pCols);
// 1. Binary search the block the data can merged into
if (1/* the data should only merged into last file */) {
} else {
}
} else {
// Write directly to the file without merge
if (1/*pCols->numOfPoints < pCfg->minRowsPerFileBlock*/) {
// TODO: write the data to the last file
} else {
// TODO: wirte the data to the data file
}
}
// TODO: need to update pIdx
if (pCompData) free(pCompData);
return 0;
}
static int compFGroupKey(const void *key, const void *fgroup) {
int fid = *(int *)key;
SFileGroup *pFGroup = (SFileGroup *)fgroup;
......@@ -158,7 +237,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname)
return 0;
}
static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the function
int tsdbOpenFile(SFile *pFile, int oflag) { // TODO: change the function
if (TSDB_IS_FILE_OPENED(pFile)) return -1;
pFile->fd = open(pFile->fname, oflag, 0755);
......@@ -167,6 +246,16 @@ static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the f
return 0;
}
SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) {
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid);
if (pGroup == NULL) return NULL;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbOpenFile(&(pGroup->files[type]), O_RDWR);
}
return pGroup;
}
static int tsdbCloseFile(SFile *pFile) {
if (!TSDB_IS_FILE_OPENED(pFile)) return -1;
int ret = close(pFile->fd);
......@@ -186,7 +275,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables,
return -1;
}
if (tsdbOpenFileForWrite(pFile, O_WRONLY | O_CREAT) < 0) {
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) {
// TODO: deal with the ERROR here
return -1;
}
......@@ -212,4 +301,12 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file
TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
}
static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) {
if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId)
return NULL;
void *ptr = bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey);
if (ptr == NULL) return NULL;
return (SFileGroup *)ptr;
}
\ No newline at end of file
......@@ -84,7 +84,8 @@ static int tsdbOpenMetaFile(char *tsdbDir);
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname);
static void * tsdbCommitToFile(void *arg);
static void * tsdbCommitData(void *arg);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
......@@ -327,7 +328,7 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
pRepo->tsdbCache->curBlock = NULL;
// TODO: here should set as detached or use join for memory leak
pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo);
pthread_create(&(pRepo->commitThread), NULL, tsdbCommitData, (void *)repo);
tsdbUnLockRepo(repo);
return 0;
......@@ -761,24 +762,22 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
return 0;
}
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCol **cols, STSchema *pSchema) {
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
int numOfRows = 0;
do {
SSkipListNode *node = tSkipListIterGet(pIter);
if (node == NULL) break;
SDataRow row = SL_GET_NODE_DATA(node);
if (dataRowKey(row) > maxKey) break;
// Convert row data to column data
// for (int i = 0; i < schemaNCols(pSchema); i++) {
// STColumn *pCol = schemaColAt(pSchema, i);
// memcpy(cols[i]->data + TYPE_BYTES[colType(pCol)] * numOfRows, dataRowAt(row, pCol->offset),
// TYPE_BYTES[colType(pCol)]);
// }
tdAppendDataRowToDataCol(row, pCols);
numOfRows++;
if (numOfRows > maxRowsToRead) break;
} while (tSkipListIterNext(pIter));
return numOfRows;
}
......@@ -816,14 +815,14 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables)
}
// Commit to file
static void *tsdbCommitToFile(void *arg) {
static void *tsdbCommitData(void *arg) {
// TODO
printf("Starting to commit....\n");
STsdbRepo * pRepo = (STsdbRepo *)arg;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbCache *pCache = pRepo->tsdbCache;
STsdbCfg * pCfg = &(pRepo->config);
if (pCache->imem == NULL) return;
if (pCache->imem == NULL) return NULL;
// Create the iterator to read from cache
SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables);
......@@ -832,52 +831,23 @@ static void *tsdbCommitToFile(void *arg) {
return NULL;
}
int maxCols = pMeta->maxCols;
int maxBytes = pMeta->maxRowBytes;
SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols);
void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock);
// Create a data column buffer for commit
SDataCols *pCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock);
if (pCols == NULL) {
// TODO: deal with the error
return NULL;
}
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
for (int fid = sfid; fid <= efid; fid++) {
TSKEY minKey = 0, maxKey = 0;
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// tsdbOpenFileForWrite(pRepo, fid);
for (int tid = 0; tid < pCfg->maxTables; tid++) {
STable *pTable = pMeta->tables[tid];
if (pTable == NULL || pTable->imem == NULL) continue;
if (iters[tid] == NULL) { // create table iterator
iters[tid] = tSkipListCreateIter(pTable->imem->pData);
// TODO: deal with the error
if (iters[tid] == NULL) break;
if (!tSkipListIterNext(iters[tid])) {
// assert(0);
}
}
// Init row data part
cols[0] = (SDataCol *)buf;
for (int col = 1; col < schemaNCols(pTable->schema); col++) {
cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock);
}
// Loop the iterator
int rowsRead = 0;
while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) >
0) {
// printf("rowsRead:%d-----------\n", rowsRead);
int k = 0;
}
}
tsdbCommitToFile(pRepo, fid, iters, pCols);
}
tdFreeDataCols(pCols);
tsdbDestroyTableIters(iters, pCfg->maxTables);
free(buf);
free(cols);
tsdbLockRepo(arg);
tdListMove(pCache->imem->list, pCache->pool.memPool);
......@@ -894,4 +864,125 @@ static void *tsdbCommitToFile(void *arg) {
tsdbUnLockRepo(arg);
return NULL;
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) {
int flag = 0;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
STsdbCfg * pCfg = &pRepo->config;
SFile tFile, lFile;
SFileGroup *pGroup = NULL;
SCompIdx * pIndices = NULL;
SCompInfo * pCompInfo = NULL;
size_t compInfoSize = 0;
SCompBlock compBlock;
SCompBlock *pBlock = &compBlock;
TSKEY minKey = 0, maxKey = 0;
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
for (int tid = 0; tid < pCfg->maxTables; tid++) {
STable * pTable = pMeta->tables[tid];
SSkipListIterator *pIter = iters[tid];
int isLoadCompBlocks = 0;
if (pIter == NULL) continue;
tdInitDataCols(pCols, pTable->schema);
int numOfWrites = 0;
// while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) {
// break;
// if (!flag) {
// // There are data to commit to this file, we need to create/open it for read/write.
// // At the meantime, we set the flag to prevent further create/open operations
// if (tsdbCreateFGroup(pFileH, pRepo->rootDir, fid, pCfg->maxTables) < 0) {
// // TODO: deal with the ERROR here
// }
// // Open files for commit
// pGroup = tsdbOpenFilesForCommit(pFileH, fid);
// if (pGroup == NULL) {
// // TODO: deal with the ERROR here
// }
// // TODO: open .h file and if neccessary, open .l file
// {}
// pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
// if (pIndices == NULL) {
// // TODO: deal with the ERROR
// }
// // load the SCompIdx part
// if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) {
// // TODO: deal with the ERROR here
// }
// // TODO: sendfile those not need changed table content
// for (int ttid = 0; ttid < tid; ttid++) {
// // SCompIdx *pIdx = &pIndices[ttid];
// // if (pIdx->len > 0) {
// // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, pIdx->offset, 0, SEEK_CUR);
// // sendfile(fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len);
// // }
// }
// flag = 1;
// }
// SCompIdx *pIdx = &pIndices[tid];
// /* The first time to write to the table, need to decide
// * if it is neccessary to load the SComplock part. If it
// * is needed, just load it, or, just use sendfile and
// * append it.
// */
// if (numOfWrites == 0 && pIdx->offset > 0) {
// if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) {
// pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len);
// if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {
// // TODO: deal with the ERROR here
// }
// if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1;
// } else {
// // TODO: sendfile the prefix part
// }
// }
// // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) {
// // // TODO: deal with the ERROR here
// // }
// // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock);
// // if (1 /* the SCompBlock part is not loaded*/) {
// // // Append to .data file generate a SCompBlock and record it
// // } else {
// // }
// // // TODO: need to reset the pCols
// numOfWrites++;
// }
// if (pCols->numOfPoints > 0) {
// // TODO: still has data to commit, commit it
// }
// if (1/* SCompBlock part is loaded, write it to .head file*/) {
// // TODO
// } else {
// // TODO: use sendfile send the old part and append the newly added part
// }
}
// Write the SCompIdx part
// Close all files and return
if (flag) {
// TODO
}
if (pIndices) free(pIndices);
if (pCompInfo) free(pCompInfo);
return 0;
}
\ No newline at end of file
......@@ -381,7 +381,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
int32_t headSize = 0;
// first tag column
STColumn* s = pSTable->tagSchema->columns[0]; //???
STColumn* s = schemaColAt(pSTable->tagSchema, 0);
tSkipListRandNodeInfo(pSTable->pIndex, &level, &headSize);
SSkipListNode* pNode = calloc(1, headSize + s->bytes + POINTER_BYTES);
......@@ -389,7 +389,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
SSkipList* list = pSTable->pIndex;
memcpy(SL_GET_NODE_KEY(list, pNode), dataRowTuple(pTable->tagVal), s->columns[0].bytes);
memcpy(SL_GET_NODE_KEY(list, pNode), dataRowTuple(pTable->tagVal), colBytes(s));
memcpy(SL_GET_NODE_DATA(pNode), &pTable, POINTER_BYTES);
tSkipListPut(list, pNode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册