提交 5380a36a 编写于 作者: H hzcheng

TD-100

上级 d426627e
...@@ -353,8 +353,9 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { ...@@ -353,8 +353,9 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
pRet->cols[i].bytes = pDataCols->cols[i].bytes; pRet->cols[i].bytes = pDataCols->cols[i].bytes;
pRet->cols[i].len = pDataCols->cols[i].len; pRet->cols[i].len = pDataCols->cols[i].len;
pRet->cols[i].offset = pDataCols->cols[i].offset; pRet->cols[i].offset = pDataCols->cols[i].offset;
pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pRet->cols[i].len); if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pRet->cols[i].bytes * pDataCols->numOfPoints);
} }
return pRet; return pRet;
...@@ -410,36 +411,47 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { ...@@ -410,36 +411,47 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
SDataCols *pTarget = tdDupDataCols(target, true); SDataCols *pTarget = tdDupDataCols(target, true);
if (pTarget == NULL) goto _err; if (pTarget == NULL) goto _err;
tdResetDataCols(target);
int iter1 = 0; int iter1 = 0;
int iter2 = 0; int iter2 = 0;
while (true) { while (true) {
if (iter1 >= pTarget->numOfPoints) { if (iter1 >= pTarget->numOfPoints && iter2 >= source->numOfPoints) break;
// TODO: merge the source part
int rowsLeft = source->numOfPoints - iter2; TSKEY key1 = (iter1 >= pTarget->numOfPoints) ? INT64_MAX : ((TSKEY *)(pTarget->cols[0].pData))[iter1];
if (rowsLeft > 0) { TSKEY key2 = (iter2 >= rowsToMerge) ? INT64_MAX : ((TSKEY *)(source->cols[0].pData))[iter2];
for (int i = 0; i < source->numOfCols; i++) {
ASSERT(target->cols[i].type == source->cols[i].type);
if (key1 < key2) { // Copy from pTarget
for (int i = 0; i < pTarget->numOfCols; i++) {
ASSERT(target->cols[i].type == pTarget->cols[i].type);
memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints),
(void *)((char *)(source->cols[i].pData) + TYPE_BYTES[source->cols[i].type] * iter2), (void *)((char *)(pTarget->cols[i].pData) + TYPE_BYTES[pTarget->cols[i].type] * iter1),
TYPE_BYTES[target->cols[i].type] * rowsLeft); TYPE_BYTES[target->cols[i].type]);
} target->cols[i].len += TYPE_BYTES[target->cols[i].type];
}
break;
} }
if (iter2 >= source->numOfPoints) { target->numOfPoints++;
// TODO: merge the pTemp part iter1++;
int rowsLeft = pTarget->numOfPoints - iter1; } else if (key1 > key2) { // Copy from source
if (rowsLeft > 0) { for (int i = 0; i < source->numOfCols; i++) {
ASSERT(target->cols[i].type == pTarget->cols[i].type);
memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints),
(void *)((char *)(source->cols[i].pData) + TYPE_BYTES[source->cols[i].type] * iter2),
TYPE_BYTES[target->cols[i].type]);
target->cols[i].len += TYPE_BYTES[target->cols[i].type];
} }
break;
target->numOfPoints++;
iter2++;
} else {
assert(false);
} }
} }
tdFreeDataCols(pTarget);
return 0; return 0;
_err: _err:
tdFreeDataCols(pTarget);
return -1; return -1;
} }
\ No newline at end of file
...@@ -335,7 +335,7 @@ _err: ...@@ -335,7 +335,7 @@ _err:
int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompBlock compBlock; SCompBlock compBlock;
if ((pHelper->files.nHeadF.fd > 0) && (pHelper->hasOldLastBlock)) { if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) {
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfSuperBlocks - 1; SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfSuperBlocks - 1;
...@@ -375,6 +375,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ...@@ -375,6 +375,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1; if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1;
} }
} else { } else {
pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
pHelper->pCompInfo->uid = pHelper->tableInfo.uid;
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
ASSERT(pIdx->offset > TSDB_FILE_HEAD_SIZE); ASSERT(pIdx->offset > TSDB_FILE_HEAD_SIZE);
...@@ -530,6 +532,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa ...@@ -530,6 +532,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
if (pCompData == NULL) return -1; if (pCompData == NULL) return -1;
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err;
if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err; if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err;
ASSERT(pCompData->numOfCols == pCompBlock->numOfCols); ASSERT(pCompData->numOfCols == pCompBlock->numOfCols);
...@@ -947,6 +950,8 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ...@@ -947,6 +950,8 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
(pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SCompBlock) * 2 : pIdx->len + sizeof(SCompBlock); (pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SCompBlock) * 2 : pIdx->len + sizeof(SCompBlock);
if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err; if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err;
pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
// Add the sub-block // Add the sub-block
if (pSCompBlock->numOfSubBlocks > 1) { if (pSCompBlock->numOfSubBlocks > 1) {
size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len); size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
......
...@@ -5,12 +5,84 @@ ...@@ -5,12 +5,84 @@
#include "dataformat.h" #include "dataformat.h"
#include "tsdbMain.h" #include "tsdbMain.h"
double getCurTime() { static double getCurTime() {
struct timeval tv; struct timeval tv;
gettimeofday(&tv, NULL); gettimeofday(&tv, NULL);
return tv.tv_sec + tv.tv_usec * 1E-6; return tv.tv_sec + tv.tv_usec * 1E-6;
} }
typedef struct {
tsdb_repo_t *pRepo;
int tid;
int64_t uid;
int sversion;
TSKEY startTime;
TSKEY interval;
int totalRows;
int rowsPerSubmit;
STSchema * pSchema;
} SInsertInfo;
static int insertData(SInsertInfo *pInfo) {
SSubmitMsg *pMsg =
(SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(pInfo->pSchema) * pInfo->rowsPerSubmit);
if (pMsg == NULL) return -1;
TSKEY start_time = pInfo->startTime;
// Loop to write data
double stime = getCurTime();
for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) {
memset((void *)pMsg, 0, sizeof(SSubmitMsg));
SSubmitBlk *pBlock = pMsg->blocks;
pBlock->uid = pInfo->uid;
pBlock->tid = pInfo->tid;
pBlock->sversion = pInfo->sversion;
pBlock->len = 0;
for (int i = 0; i < pInfo->rowsPerSubmit; i++) {
// start_time += 1000;
start_time += pInfo->interval;
SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
tdInitDataRow(row, pInfo->pSchema);
for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) {
if (j == 0) { // Just for timestamp
tdAppendColVal(row, (void *)(&start_time), schemaColAt(pInfo->pSchema, j));
} else { // For int
int val = 10;
tdAppendColVal(row, (void *)(&val), schemaColAt(pInfo->pSchema, j));
}
}
pBlock->len += dataRowLen(row);
}
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
pMsg->numOfBlocks = 1;
pBlock->len = htonl(pBlock->len);
pBlock->numOfRows = htonl(pBlock->numOfRows);
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding);
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
pMsg->compressed = htonl(pMsg->numOfBlocks);
if (tsdbInsertData(pInfo->pRepo, pMsg) < 0) {
tfree(pMsg);
return -1;
}
}
double etime = getCurTime();
printf("Spent %f seconds to write %d records\n", etime - stime, pInfo->totalRows);
tfree(pMsg);
return 0;
}
TEST(TsdbTest, DISABLED_tableEncodeDecode) { TEST(TsdbTest, DISABLED_tableEncodeDecode) {
// TEST(TsdbTest, tableEncodeDecode) { // TEST(TsdbTest, tableEncodeDecode) {
STable *pTable = (STable *)malloc(sizeof(STable)); STable *pTable = (STable *)malloc(sizeof(STable));
...@@ -51,6 +123,7 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { ...@@ -51,6 +123,7 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) {
// TEST(TsdbTest, DISABLED_createRepo) { // TEST(TsdbTest, DISABLED_createRepo) {
TEST(TsdbTest, createRepo) { TEST(TsdbTest, createRepo) {
STsdbCfg config; STsdbCfg config;
STsdbRepo *repo;
// 1. Create a tsdb repository // 1. Create a tsdb repository
tsdbSetDefaultCfg(&config); tsdbSetDefaultCfg(&config);
...@@ -79,64 +152,73 @@ TEST(TsdbTest, createRepo) { ...@@ -79,64 +152,73 @@ TEST(TsdbTest, createRepo) {
tsdbCreateTable(pRepo, &tCfg); tsdbCreateTable(pRepo, &tCfg);
// // 3. Loop to write some simple data // Insert Some Data
int nRows = 10000000; SInsertInfo iInfo = {
int rowsPerSubmit = 10; .pRepo = pRepo,
int64_t start_time = 1584081000000; .tid = tCfg.tableId.tid,
.uid = tCfg.tableId.uid,
SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); .sversion = tCfg.sversion,
.startTime = 1584081000000,
double stime = getCurTime(); .interval = 1000,
.totalRows = 50,
for (int k = 0; k < nRows/rowsPerSubmit; k++) { .rowsPerSubmit = 1,
memset((void *)pMsg, 0, sizeof(SSubmitMsg)); .pSchema = schema
SSubmitBlk *pBlock = pMsg->blocks; };
pBlock->uid = 987607499877672L;
pBlock->tid = 0; ASSERT_EQ(insertData(&iInfo), 0);
pBlock->sversion = 0;
pBlock->len = 0; // Close the repository
for (int i = 0; i < rowsPerSubmit; i++) { tsdbCloseRepo(pRepo);
// start_time += 1000;
start_time += 1000;
SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
tdInitDataRow(row, schema);
for (int j = 0; j < schemaNCols(schema); j++) {
if (j == 0) { // Just for timestamp
tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j));
} else { // For int
int val = 10;
tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j));
}
}
pBlock->len += dataRowLen(row);
}
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
pMsg->numOfBlocks = 1;
pBlock->len = htonl(pBlock->len);
pBlock->numOfRows = htonl(pBlock->numOfRows);
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding);
pMsg->length = htonl(pMsg->length); // Open the repository again
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
pMsg->compressed = htonl(pMsg->numOfBlocks); repo = (STsdbRepo *)pRepo;
ASSERT_NE(pRepo, nullptr);
tsdbInsertData(pRepo, pMsg); // Insert more data
} iInfo.startTime = iInfo.startTime + iInfo.interval * iInfo.totalRows;
iInfo.totalRows = 10;
iInfo.pRepo = pRepo;
ASSERT_EQ(insertData(&iInfo), 0);
double etime = getCurTime(); // Close the repository
tsdbCloseRepo(pRepo);
void *ptr = malloc(150000); // Open the repository again
free(ptr); pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
repo = (STsdbRepo *)pRepo;
ASSERT_NE(pRepo, nullptr);
printf("Spent %f seconds to write %d records\n", etime - stime, nRows); // Read from file
SRWHelper rhelper;
SHelperCfg helperCfg = {
.type = TSDB_READ_HELPER,
.maxTables = repo->config.maxTables,
.maxRowSize = repo->tsdbMeta->maxRowBytes,
.maxRows = repo->config.maxRowsPerFileBlock,
.maxCols = repo->tsdbMeta->maxCols,
.minRowsPerFileBlock = repo->config.minRowsPerFileBlock,
.maxRowsPerFileBlock = repo->config.maxRowsPerFileBlock,
.compress = repo->config.compression,
};
tsdbInitHelper(&rhelper, &helperCfg);
SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833);
ASSERT_NE(pFGroup, nullptr);
ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0);
SHelperTable htable = {
.uid = tCfg.tableId.uid,
.tid = tCfg.tableId.tid,
.sversion = tCfg.sversion
};
tsdbSetHelperTable(&rhelper, &htable, schema);
ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0);
ASSERT_EQ(tsdbLoadBlockData(&rhelper, 0, NULL), 0);
tsdbCloseRepo(pRepo); int k = 0;
} }
TEST(TsdbTest, DISABLED_openRepo) { TEST(TsdbTest, DISABLED_openRepo) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册