提交 cdce05d1 编写于 作者: L lichuang

[TD-3963]merge from develop

...@@ -319,7 +319,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); ...@@ -319,7 +319,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
SDataCols *tdFreeDataCols(SDataCols *pCols); SDataCols *tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset);
// ----------------- K-V data row structure // ----------------- K-V data row structure
/* /*
......
...@@ -441,30 +441,35 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) ...@@ -441,30 +441,35 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
pCols->numOfRows++; pCols->numOfRows++;
} }
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfCols == source->numOfCols); ASSERT(target->numOfCols == source->numOfCols);
int offset = 0;
if (pOffset == NULL) {
pOffset = &offset;
}
SDataCols *pTarget = NULL; SDataCols *pTarget = NULL;
if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyFirst(source))) { // No overlap
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
for (int i = 0; i < rowsToMerge; i++) { for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) { for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0) { if (source->cols[j].len > 0) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows, dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
target->maxPoints); target->maxPoints);
} }
} }
target->numOfRows++; target->numOfRows++;
} }
(*pOffset) += rowsToMerge;
} else { } else {
pTarget = tdDupDataCols(target, true); pTarget = tdDupDataCols(target, true);
if (pTarget == NULL) goto _err; if (pTarget == NULL) goto _err;
int iter1 = 0; int iter1 = 0;
int iter2 = 0; tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows,
tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows,
pTarget->numOfRows + rowsToMerge); pTarget->numOfRows + rowsToMerge);
} }
......
...@@ -31,6 +31,8 @@ typedef struct { ...@@ -31,6 +31,8 @@ typedef struct {
#define TFS_UNDECIDED_ID -1 #define TFS_UNDECIDED_ID -1
#define TFS_PRIMARY_LEVEL 0 #define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0 #define TFS_PRIMARY_ID 0
#define TFS_MIN_LEVEL 0
#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1)
// FS APIs ==================================== // FS APIs ====================================
typedef struct { typedef struct {
......
...@@ -409,6 +409,9 @@ void tsdbDecCommitRef(int vgId); ...@@ -409,6 +409,9 @@ void tsdbDecCommitRef(int vgId);
int tsdbSyncSend(void *pRepo, SOCKET socketFd); int tsdbSyncSend(void *pRepo, SOCKET socketFd);
int tsdbSyncRecv(void *pRepo, SOCKET socketFd); int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -6,6 +6,10 @@ AUX_SOURCE_DIRECTORY(src SRC) ...@@ -6,6 +6,10 @@ AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(tsdb ${SRC}) ADD_LIBRARY(tsdb ${SRC})
TARGET_LINK_LIBRARIES(tsdb tfs common tutil) TARGET_LINK_LIBRARIES(tsdb tfs common tutil)
IF (TD_TSDB_PLUGINS)
TARGET_LINK_LIBRARIES(tsdb tsdbPlugins)
ENDIF ()
IF (TD_LINUX) IF (TD_LINUX)
# Someone has no gtest directory, so comment it # Someone has no gtest directory, so comment it
# ADD_SUBDIRECTORY(tests) # ADD_SUBDIRECTORY(tests)
......
...@@ -29,10 +29,17 @@ typedef struct { ...@@ -29,10 +29,17 @@ typedef struct {
int64_t size; int64_t size;
} SKVRecord; } SKVRecord;
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo); void *tsdbCommitData(STsdbRepo *pRepo);
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf);
int tsdbApplyRtn(STsdbRepo *pRepo); int tsdbApplyRtn(STsdbRepo *pRepo);
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#ifndef _TD_TSDB_COMMIT_QUEUE_H_ #ifndef _TD_TSDB_COMMIT_QUEUE_H_
#define _TD_TSDB_COMMIT_QUEUE_H_ #define _TD_TSDB_COMMIT_QUEUE_H_
int tsdbScheduleCommit(STsdbRepo *pRepo); typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T;
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req);
#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */ #endif /* _TD_TSDB_COMMIT_QUEUE_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_COMPACT_H_
#define _TD_TSDB_COMPACT_H_
#ifdef __cplusplus
extern "C" {
#endif
void *tsdbCompactImpl(STsdbRepo *pRepo);
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_COMPACT_H_ */
\ No newline at end of file
...@@ -64,6 +64,8 @@ extern "C" { ...@@ -64,6 +64,8 @@ extern "C" {
#include "tsdbReadImpl.h" #include "tsdbReadImpl.h"
// Commit // Commit
#include "tsdbCommit.h" #include "tsdbCommit.h"
// Compact
#include "tsdbCompact.h"
// Commit Queue // Commit Queue
#include "tsdbCommitQueue.h" #include "tsdbCommitQueue.h"
// Main definitions // Main definitions
......
...@@ -51,7 +51,7 @@ typedef struct { ...@@ -51,7 +51,7 @@ typedef struct {
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) #define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) #define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5) #define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static int tsdbCommitMeta(STsdbRepo *pRepo); static int tsdbCommitMeta(STsdbRepo *pRepo);
...@@ -72,7 +72,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid); ...@@ -72,7 +72,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2); static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
static int tsdbWriteBlockInfo(SCommitH *pCommih); static int tsdbWriteBlockInfo(SCommitH *pCommih);
static int tsdbWriteBlockIdx(SCommitH *pCommih);
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitH *pCommith, int bidx); static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
...@@ -86,7 +85,6 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); ...@@ -86,7 +85,6 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update); TSKEY maxKey, int maxRows, int8_t update);
static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
void *tsdbCommitData(STsdbRepo *pRepo) { void *tsdbCommitData(STsdbRepo *pRepo) {
if (pRepo->imem == NULL) { if (pRepo->imem == NULL) {
...@@ -117,6 +115,151 @@ _err: ...@@ -117,6 +115,151 @@ _err:
return NULL; return NULL;
} }
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet;
STsdbFS * pfs = REPO_FS(pRepo);
int level;
ASSERT(pSet->fid >= pRtn->minFid);
level = tsdbGetFidLevel(pSet->fid, pRtn);
tfsAllocDisk(level, &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs));
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
return -1;
}
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
return -1;
}
tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
} else {
// On a correct level
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
return -1;
}
}
return 0;
}
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
SBlockIdx *pIdx) {
size_t nSupBlocks;
size_t nSubBlocks;
uint32_t tlen;
SBlockInfo *pBlkInfo;
int64_t offset;
SBlock * pBlock;
memset(pIdx, 0, sizeof(*pIdx));
nSupBlocks = taosArrayGetSize(pSupA);
nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA);
if (nSupBlocks <= 0) {
// No data (data all deleted)
return 0;
}
tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
pBlkInfo = *ppBuf;
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
pBlkInfo->tid = TABLE_TID(pTable);
pBlkInfo->uid = TABLE_UID(pTable);
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock));
if (nSubBlocks > 0) {
memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock));
for (int i = 0; i < nSupBlocks; i++) {
pBlock = pBlkInfo->blocks + i;
if (pBlock->numOfSubBlocks > 1) {
pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
}
}
}
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) {
return -1;
}
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
// Set pIdx
pBlock = taosArrayGetLast(pSupA);
pIdx->tid = TABLE_TID(pTable);
pIdx->uid = TABLE_UID(pTable);
pIdx->hasLast = pBlock->last ? 1 : 0;
pIdx->maxKey = pBlock->keyLast;
pIdx->numOfBlocks = (uint32_t)nSupBlocks;
pIdx->len = tlen;
pIdx->offset = (uint32_t)offset;
return 0;
}
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
SBlockIdx *pBlkIdx;
size_t nidx = taosArrayGetSize(pIdxA);
int tlen = 0, size;
int64_t offset;
if (nidx <= 0) {
// All data are deleted
pHeadf->info.offset = 0;
pHeadf->info.len = 0;
return 0;
}
for (size_t i = 0; i < nidx; i++) {
pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i);
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1;
void *ptr = POINTER_SHIFT(*ppBuf, tlen);
tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
tlen += size;
}
tlen += sizeof(TSCKSUM);
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen);
if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) {
return -1;
}
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM)));
pHeadf->info.offset = (uint32_t)offset;
pHeadf->info.len = tlen;
return 0;
}
// =================== Commit Meta Data // =================== Commit Meta Data
static int tsdbCommitMeta(STsdbRepo *pRepo) { static int tsdbCommitMeta(STsdbRepo *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
...@@ -446,7 +589,8 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { ...@@ -446,7 +589,8 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
} }
} }
if (tsdbWriteBlockIdx(pCommith) < 0) { if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) <
0) {
tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
tsdbCloseCommitFile(pCommith, true); tsdbCloseCommitFile(pCommith, true);
// revert the file change // revert the file change
...@@ -754,23 +898,21 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { ...@@ -754,23 +898,21 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
} }
} }
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
bool isSuper) { bool isLast, bool isSuper, void **ppBuf, void **ppCBuf) {
STsdbRepo * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo); STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData; SBlockData *pBlockData;
int64_t offset = 0; int64_t offset = 0;
STable * pTable = TSDB_COMMIT_TABLE(pCommith);
int rowsToWrite = pDataCols->numOfRows; int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
// Make buffer space // Make buffer space
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) { if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
return -1; return -1;
} }
pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith); pBlockData = (SBlockData *)(*ppBuf);
// Get # of cols not all NULL(not including key column) // Get # of cols not all NULL(not including key column)
int nColsNotAllNull = 0; int nColsNotAllNull = 0;
...@@ -816,23 +958,23 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo ...@@ -816,23 +958,23 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
void * tptr; void * tptr;
// Make room // Make room
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) { if (tsdbMakeRoom(ppBuf, lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) {
return -1; return -1;
} }
pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith); pBlockData = (SBlockData *)(*ppBuf);
pBlockCol = pBlockData->cols + tcol; pBlockCol = pBlockData->cols + tcol;
tptr = POINTER_SHIFT(pBlockData, lsize); tptr = POINTER_SHIFT(pBlockData, lsize);
if (pCfg->compression == TWO_STAGE_COMP && if (pCfg->compression == TWO_STAGE_COMP &&
tsdbMakeRoom((void **)(&TSDB_COMMIT_COMP_BUF(pCommith)), tlen + COMP_OVERFLOW_BYTES) < 0) { tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
return -1; return -1;
} }
// Compress or just copy // Compress or just copy
if (pCfg->compression) { if (pCfg->compression) {
flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
tlen + COMP_OVERFLOW_BYTES, pCfg->compression, tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf,
TSDB_COMMIT_COMP_BUF(pCommith), tlen + COMP_OVERFLOW_BYTES); tlen + COMP_OVERFLOW_BYTES);
} else { } else {
flen = tlen; flen = tlen;
memcpy(tptr, pDataCol->pData, flen); memcpy(tptr, pDataCol->pData, flen);
...@@ -888,117 +1030,33 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo ...@@ -888,117 +1030,33 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
return 0; return 0;
} }
static int tsdbWriteBlockInfo(SCommitH *pCommih) { static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); bool isSuper) {
SBlockIdx blkIdx; return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast,
STable * pTable = TSDB_COMMIT_TABLE(pCommih); isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
SBlock * pBlock; (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))));
size_t nSupBlocks; }
size_t nSubBlocks;
uint32_t tlen;
SBlockInfo *pBlkInfo;
int64_t offset;
nSupBlocks = taosArrayGetSize(pCommih->aSupBlk);
nSubBlocks = taosArrayGetSize(pCommih->aSubBlk);
if (nSupBlocks <= 0) {
// No data (data all deleted)
return 0;
}
tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
// Write SBlockInfo part
if (tsdbMakeRoom((void **)(&(TSDB_COMMIT_BUF(pCommih))), tlen) < 0) return -1;
pBlkInfo = TSDB_COMMIT_BUF(pCommih);
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
pBlkInfo->tid = TABLE_TID(pTable);
pBlkInfo->uid = TABLE_UID(pTable);
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pCommih->aSupBlk, 0), nSupBlocks * sizeof(SBlock));
if (nSubBlocks > 0) {
memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pCommih->aSubBlk, 0), nSubBlocks * sizeof(SBlock));
for (int i = 0; i < nSupBlocks; i++) {
pBlock = pBlkInfo->blocks + i;
if (pBlock->numOfSubBlocks > 1) {
pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
}
}
}
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < 0) {
return -1;
}
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
// Set blkIdx
pBlock = taosArrayGet(pCommih->aSupBlk, nSupBlocks - 1);
blkIdx.tid = TABLE_TID(pTable);
blkIdx.uid = TABLE_UID(pTable);
blkIdx.hasLast = pBlock->last ? 1 : 0;
blkIdx.maxKey = pBlock->keyLast;
blkIdx.numOfBlocks = (uint32_t)nSupBlocks;
blkIdx.len = tlen;
blkIdx.offset = (uint32_t)offset;
ASSERT(blkIdx.numOfBlocks > 0); static int tsdbWriteBlockInfo(SCommitH *pCommih) {
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
SBlockIdx blkIdx;
STable * pTable = TSDB_COMMIT_TABLE(pCommih);
if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))),
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; &blkIdx) < 0) {
return -1; return -1;
} }
return 0; if (blkIdx.numOfBlocks == 0) {
}
static int tsdbWriteBlockIdx(SCommitH *pCommih) {
SBlockIdx *pBlkIdx = NULL;
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
size_t nidx = taosArrayGetSize(pCommih->aBlkIdx);
int tlen = 0, size = 0;
int64_t offset = 0;
if (nidx <= 0) {
// All data are deleted
pHeadf->info.offset = 0;
pHeadf->info.len = 0;
return 0; return 0;
} }
for (size_t i = 0; i < nidx; i++) { if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
pBlkIdx = (SBlockIdx *)taosArrayGet(pCommih->aBlkIdx, i); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen + size) < 0) return -1;
void *ptr = POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen);
tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
tlen += size;
}
tlen += sizeof(TSCKSUM);
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen) < 0) return -1;
taosCalcChecksumAppend(0, (uint8_t *)TSDB_COMMIT_BUF(pCommih), tlen);
if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < tlen) {
tsdbError("vgId:%d failed to write block index part to file %s since %s", TSDB_COMMIT_REPO_ID(pCommih),
TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno));
return -1; return -1;
} }
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen - sizeof(TSCKSUM)));
pHeadf->info.offset = (uint32_t)offset;
pHeadf->info.len = tlen;
return 0; return 0;
} }
...@@ -1454,45 +1512,3 @@ int tsdbApplyRtn(STsdbRepo *pRepo) { ...@@ -1454,45 +1512,3 @@ int tsdbApplyRtn(STsdbRepo *pRepo) {
return 0; return 0;
} }
static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet;
STsdbFS * pfs = REPO_FS(pRepo);
int level;
ASSERT(pSet->fid >= pRtn->minFid);
level = tsdbGetFidLevel(pSet->fid, pRtn);
tfsAllocDisk(level, &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs));
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
return -1;
}
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
return -1;
}
tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
} else {
// On a correct level
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
return -1;
}
}
return 0;
}
\ No newline at end of file
...@@ -26,8 +26,9 @@ typedef struct { ...@@ -26,8 +26,9 @@ typedef struct {
} SCommitQueue; } SCommitQueue;
typedef struct { typedef struct {
TSDB_REQ_T req;
STsdbRepo *pRepo; STsdbRepo *pRepo;
} SCommitReq; } SReq;
static void *tsdbLoopCommit(void *arg); static void *tsdbLoopCommit(void *arg);
...@@ -90,16 +91,17 @@ void tsdbDestroyCommitQueue() { ...@@ -90,16 +91,17 @@ void tsdbDestroyCommitQueue() {
pthread_mutex_destroy(&(pQueue->lock)); pthread_mutex_destroy(&(pQueue->lock));
} }
int tsdbScheduleCommit(STsdbRepo *pRepo) { int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) {
SCommitQueue *pQueue = &tsCommitQueue; SCommitQueue *pQueue = &tsCommitQueue;
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SCommitReq)); SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq));
if (pNode == NULL) { if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
((SCommitReq *)pNode->data)->pRepo = pRepo; ((SReq *)pNode->data)->req = req;
((SReq *)pNode->data)->pRepo = pRepo;
pthread_mutex_lock(&(pQueue->lock)); pthread_mutex_lock(&(pQueue->lock));
...@@ -154,7 +156,7 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -154,7 +156,7 @@ static void *tsdbLoopCommit(void *arg) {
SCommitQueue *pQueue = &tsCommitQueue; SCommitQueue *pQueue = &tsCommitQueue;
SListNode * pNode = NULL; SListNode * pNode = NULL;
STsdbRepo * pRepo = NULL; STsdbRepo * pRepo = NULL;
bool config_changed = false; TSDB_REQ_T req;
while (true) { while (true) {
pthread_mutex_lock(&(pQueue->lock)); pthread_mutex_lock(&(pQueue->lock));
...@@ -175,18 +177,19 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -175,18 +177,19 @@ static void *tsdbLoopCommit(void *arg) {
pthread_mutex_unlock(&(pQueue->lock)); pthread_mutex_unlock(&(pQueue->lock));
pRepo = ((SCommitReq *)pNode->data)->pRepo; req = ((SReq *)pNode->data)->req;
pRepo = ((SReq *)pNode->data)->pRepo;
// check if need to apply new config if (req == COMMIT_REQ) {
config_changed = pRepo->config_changed; tsdbCommitData(pRepo);
if (pRepo->config_changed) { } else if (req == COMPACT_REQ) {
tsdbCompactImpl(pRepo);
} else if (req == COMMIT_CONFIG_REQ) {
ASSERT(pRepo->config_changed);
tsdbApplyRepoConfig(pRepo); tsdbApplyRepoConfig(pRepo);
}
if (config_changed && pRepo->imem == NULL) {
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
} else { } else {
tsdbCommitData(pRepo); ASSERT(0);
} }
listNodeFree(pNode); listNodeFree(pNode);
......
...@@ -11,4 +11,12 @@ ...@@ -11,4 +11,12 @@
* *
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
\ No newline at end of file #include "tsdb.h"
#ifndef _TSDB_PLUGINS
int tsdbCompact(STsdbRepo *pRepo) { return 0; }
void *tsdbCompactImpl(STsdbRepo *pRepo) { return NULL; }
#endif
\ No newline at end of file
...@@ -280,7 +280,7 @@ int tsdbAsyncCommitConfig(STsdbRepo* pRepo) { ...@@ -280,7 +280,7 @@ int tsdbAsyncCommitConfig(STsdbRepo* pRepo) {
} }
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
tsdbScheduleCommit(pRepo); tsdbScheduleCommit(pRepo, COMMIT_CONFIG_REQ);
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
return 0; return 0;
...@@ -303,7 +303,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -303,7 +303,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem; pRepo->imem = pRepo->mem;
pRepo->mem = NULL; pRepo->mem = NULL;
tsdbScheduleCommit(pRepo); tsdbScheduleCommit(pRepo, COMMIT_REQ);
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
return 0; return 0;
......
...@@ -258,7 +258,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ...@@ -258,7 +258,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
for (int i = 1; i < pBlock->numOfSubBlocks; i++) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++; iBlock++;
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1;
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1; if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL) < 0) return -1;
} }
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
...@@ -284,7 +284,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, ...@@ -284,7 +284,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
for (int i = 1; i < pBlock->numOfSubBlocks; i++) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++; iBlock++;
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1;
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1; if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL) < 0) return -1;
} }
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import tdDnodes
from datetime import datetime
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def run(self):
tdSql.prepare()
tdSql.query('show databases')
tdSql.checkData(0,15,0)
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath + "/build/bin/"
#write 5M rows into db, then restart to force the data move into disk.
#create 500 tables
os.system("%staosdemo -f tools/taosdemoAllTest/insert_5M_rows.json -y " % binPath)
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.execute('use db')
#prepare to query 500 tables last_row()
tableName = []
for i in range(500):
tableName.append(f"stb_{i}")
tdSql.execute('use db')
lastRow_Off_start = datetime.now()
slow = 0 #count time where lastRow on is slower
for i in range(5):
#switch lastRow to off and check
tdSql.execute('alter database db cachelast 0')
tdSql.query('show databases')
tdSql.checkData(0,15,0)
#run last_row(*) query 500 times
for i in range(500):
tdSql.execute(f'SELECT LAST_ROW(*) FROM {tableName[i]}')
lastRow_Off_end = datetime.now()
tdLog.debug(f'time used:{lastRow_Off_end-lastRow_Off_start}')
#switch lastRow to on and check
tdSql.execute('alter database db cachelast 1')
tdSql.query('show databases')
tdSql.checkData(0,15,1)
#run last_row(*) query 500 times
tdSql.execute('use db')
lastRow_On_start = datetime.now()
for i in range(500):
tdSql.execute(f'SELECT LAST_ROW(*) FROM {tableName[i]}')
lastRow_On_end = datetime.now()
tdLog.debug(f'time used:{lastRow_On_end-lastRow_On_start}')
#check which one used more time
if (lastRow_Off_end-lastRow_Off_start > lastRow_On_end-lastRow_On_start):
pass
else:
slow += 1
tdLog.debug(slow)
if slow > 1: #tolerance for the first time
tdLog.exit('lastRow hot alter failed')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
...@@ -336,5 +336,5 @@ python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py ...@@ -336,5 +336,5 @@ python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py
python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py
python3 ./test.py -f tag_lite/drop_auto_create.py python3 ./test.py -f tag_lite/drop_auto_create.py
python3 test.py -f insert/insert_before_use_db.py python3 test.py -f insert/insert_before_use_db.py
python3 test.py -f alter/alter_cacheLastRow.py
#======================p4-end=============== #======================p4-end===============
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"interlace_rows": 100,
"num_of_records_per_req": 100,
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": 3650,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"childtable_count": 500,
"childtable_prefix": "stb_",
"auto_create_table": "no",
"batch_create_tbl_num": 20,
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 10000,
"childtable_limit": 10,
"childtable_offset":100,
"interlace_rows": 0,
"insert_interval":0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 10,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [{"type": "INT"}],
"tags": [{"type": "TINYINT", "count":2}]
}]
}]
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册