提交 779b12b2 编写于 作者: H Hongze Cheng

partial work

上级 22d9a807
......@@ -278,7 +278,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
void tdResetDataCols(SDataCols *pCols);
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
void tdFreeDataCols(SDataCols *pCols);
void *tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
......
......@@ -337,12 +337,13 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
return 0;
}
void tdFreeDataCols(SDataCols *pCols) {
SDataCols *tdFreeDataCols(SDataCols *pCols) {
if (pCols) {
tfree(pCols->buf);
tfree(pCols->cols);
free(pCols);
}
return NULL;
}
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
......
......@@ -35,7 +35,7 @@ void taosDumpMemoryLeak();
void * taosTMalloc(size_t size);
void * taosTCalloc(size_t nmemb, size_t size);
void * taosTRealloc(void *ptr, size_t size);
void taosTZfree(void *ptr);
void * taosTZfree(void *ptr);
size_t taosTSizeof(void *ptr);
void taosTMemset(void *ptr, int c);
......
......@@ -512,8 +512,9 @@ void * taosTRealloc(void *ptr, size_t size) {
return (void *)((char *)tptr + sizeof(size_t));
}
void taosTZfree(void *ptr) {
void* taosTZfree(void* ptr) {
if (ptr) {
free((void *)((char *)ptr - sizeof(size_t)));
free((void*)((char*)ptr - sizeof(size_t)));
}
return NULL;
}
\ No newline at end of file
......@@ -365,6 +365,7 @@ typedef struct {
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
#define TSDB_FILE_FULL_NAME(f) TFILE_NAME(&((f)->f))
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id);
......@@ -632,6 +633,7 @@ static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
#include "tsdbReadImpl.h"
#if 0
// ================= tsdbRWHelper.c
typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
......@@ -730,6 +732,8 @@ static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
}
}
#endif
// ================= tsdbScan.c
typedef struct {
SFileGroup fGroup;
......
......@@ -78,23 +78,64 @@ typedef struct {
struct SReadH {
STsdbRepo * pRepo;
SDFileSet * pSet;
SDFileSet rSet; // File set
SArray * aBlkIdx;
STable * pTable; // Table info
SBlockIdx * pBlkIdx;
int cidx;
STable * pTable;
SBlockIdx * pBlockIdx;
SBlockInfo *pBlkInfo;
SBlockData *pBlkData;
SBlockData *pBlkData; // Block info
SDataCols * pDCols[2];
void * pBuf;
void * pCBuf;
};
#define TSDB_READ_REPO(rh) (rh)->pRepo
#define TSDB_READ_FSET(rh) (rh)->pSet
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) &((rh)->rSet)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_BUF(rh) (rh)->pBuf
#define TSDB_READ_COMP_BUF(rh) (rh)->pCBuf
#define TSDB_READ_FSET_IS_SET(rh) ((rh)->pSet != NULL)
#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo);
void tsdbDestroyReadH(SReadH *pReadh);
int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet);
void tsdbCloseAndUnsetFSet(SReadH *pReadh);
int tsdbLoadBlockIdx(SReadH *pReadh);
int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo);
int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo, const int16_t *colIds,
const int numOfColsIds);
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols);
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
void * pBuf = *ppBuf;
size_t tsize = taosTSizeof(pBuf);
if (tsize < size) {
if (tsize == 0) tsize = 1024;
while (tsize < size) {
tsize *= 2;
}
*ppBuf = taosTRealloc(pBuf, tsize);
if (*ppBuf == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
return 0;
}
#ifdef __cplusplus
}
......
......@@ -37,6 +37,12 @@ typedef struct {
SDataCols * pDataCols;
} SCommitH;
#define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh))
#define TSDB_COMMIT_WRITE_FSET(ch) ((ch)->pWSet)
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
void *tsdbCommitData(STsdbRepo *pRepo) {
if (tsdbStartCommit(pRepo) < 0) {
tsdbError("vgId:%d failed to commit data while startting to commit since %s", REPO_ID(pRepo), tstrerror(terrno));
......@@ -510,5 +516,150 @@ static int tsdbAppendCommit(SCommitIter *pIter, TSKEY keyEnd) {
static int tsdbMergeCommit(SCommitIter *pIter, SBlock *pBlock, TSKEY keyEnd) {
// TODO
return 0;
}
static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper) {
STsdbCfg * pCfg = &(pHelper->pRepo->config);
SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer);
int64_t offset = 0;
int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true);
offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) {
tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile),
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pCompCol = pCompData->cols + nColsNotAllNull;
if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it
continue;
}
memset(pCompCol, 0, sizeof(*pCompCol));
pCompCol->colId = pDataCol->colId;
pCompCol->type = pDataCol->type;
if (tDataTypeDesc[pDataCol->type].getStatisFunc) {
(*tDataTypeDesc[pDataCol->type].getStatisFunc)(
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
&(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
}
nColsNotAllNull++;
}
ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);
// Compress the data if neccessary
int tcol = 0;
int32_t toffset = 0;
int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull);
int32_t lsize = tsize;
int32_t keyLen = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
if (ncol != 0 && tcol >= nColsNotAllNull) break;
SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pCompCol = pCompData->cols + tcol;
if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue;
void *tptr = POINTER_SHIFT(pCompData, lsize);
int32_t flen = 0; // final length
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
if (pCfg->compression) {
if (pCfg->compression == TWO_STAGE_COMP) {
pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES);
if (pHelper->compBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
}
flen = (*(tDataTypeDesc[pDataCol->type].compFunc))(
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, (int32_t)taosTSizeof(pHelper->pBuffer) - lsize,
pCfg->compression, pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer));
} else {
flen = tlen;
memcpy(tptr, pDataCol->pData, flen);
}
// Add checksum
ASSERT(flen > 0);
flen += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)tptr, flen);
pFile->info.magic =
taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM));
if (ncol != 0) {
pCompCol->offset = toffset;
pCompCol->len = flen;
tcol++;
} else {
keyLen = flen;
}
toffset += flen;
lsize += flen;
}
pCompData->delimiter = TSDB_FILE_DELIMITER;
pCompData->uid = pHelper->tableInfo.uid;
pCompData->numOfCols = nColsNotAllNull;
taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize);
pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)),
sizeof(TSCKSUM));
// Write the whole block to file
if (taosWrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize,
TSDB_FILE_NAME(pFile), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// Update pBlock membership vairables
pBlock->last = isLast;
pBlock->offset = offset;
pBlock->algorithm = pCfg->compression;
pBlock->numOfRows = rowsToWrite;
pBlock->len = lsize;
pBlock->keyLen = keyLen;
pBlock->numOfSubBlocks = isSuper ? 1 : 0;
pBlock->numOfCols = nColsNotAllNull;
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, TSDB_FILE_NAME(pFile), (int64_t)(pBlock->offset),
(int)(pBlock->numOfRows), pBlock->len, pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
pFile->info.size += pBlock->len;
// ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
return 0;
_err:
return -1;
}
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
// TODO
return 0;
}
\ No newline at end of file
此差异已折叠。
......@@ -132,7 +132,7 @@ void taosArrayClear(SArray* pArray);
* destroy array list
* @param pArray
*/
void taosArrayDestroy(SArray* pArray);
void* taosArrayDestroy(SArray* pArray);
/**
*
......
......@@ -189,13 +189,13 @@ void taosArrayClear(SArray* pArray) {
pArray->size = 0;
}
void taosArrayDestroy(SArray* pArray) {
if (pArray == NULL) {
return;
void* taosArrayDestroy(SArray* pArray) {
if (pArray) {
free(pArray->pData);
free(pArray);
}
free(pArray->pData);
free(pArray);
return NULL;
}
void taosArrayDestroyEx(SArray* pArray, void (*fp)(void*)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册