提交 9b0bfc89 编写于 作者: H Hongze Cheng

make SDataCols buffer flexible

上级 9ca99fa7
...@@ -235,11 +235,11 @@ typedef struct { ...@@ -235,11 +235,11 @@ typedef struct {
int maxPoints; // max number of points int maxPoints; // max number of points
int bufSize; int bufSize;
int numOfRows; int numOfRows;
int numOfCols; // Total number of cols int numOfCols; // Total number of cols
int sversion; // TODO: set sversion int sversion; // TODO: set sversion
void * buf; void * buf;
SDataCol cols[]; SDataCol *cols;
} SDataCols; } SDataCols;
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column #define keyCol(pCols) (&((pCols)->cols[0])) // Key column
...@@ -249,13 +249,14 @@ typedef struct { ...@@ -249,13 +249,14 @@ typedef struct {
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
void tdResetDataCols(SDataCols *pCols); void tdResetDataCols(SDataCols *pCols);
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
void tdFreeDataCols(SDataCols *pCols); void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows); void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
// ----------------- K-V data row structure // ----------------- K-V data row structure
/* /*
......
...@@ -310,9 +310,15 @@ void dataColSetOffset(SDataCol *pCol, int nEle) { ...@@ -310,9 +310,15 @@ void dataColSetOffset(SDataCol *pCol, int nEle) {
} }
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols); SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
if (pCols == NULL) return NULL; if (pCols == NULL) return NULL;
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
if (pCols->cols == NULL) {
tdFreeDataCols(pCols);
return NULL;
}
pCols->maxRowSize = maxRowSize; pCols->maxRowSize = maxRowSize;
pCols->maxCols = maxCols; pCols->maxCols = maxCols;
pCols->maxPoints = maxRows; pCols->maxPoints = maxRows;
...@@ -320,15 +326,27 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { ...@@ -320,15 +326,27 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
pCols->buf = malloc(pCols->bufSize); pCols->buf = malloc(pCols->bufSize);
if (pCols->buf == NULL) { if (pCols->buf == NULL) {
free(pCols); tdFreeDataCols(pCols);
return NULL; return NULL;
} }
return pCols; return pCols;
} }
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
// assert(schemaNCols(pSchema) <= pCols->numOfCols); if (schemaNCols(pSchema) > pCols->maxCols) {
pCols->maxCols = schemaNCols(pSchema);
pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
if (pCols->cols == NULL) return -1;
}
if (schemaTLen(pSchema) > pCols->maxRowSize) {
pCols->maxRowSize = schemaTLen(pSchema);
pCols->bufSize = schemaTLen(pSchema) * pCols->maxPoints;
pCols->buf = realloc(pCols->buf, pCols->bufSize);
if (pCols->buf == NULL) return -1;
}
tdResetDataCols(pCols); tdResetDataCols(pCols);
pCols->numOfCols = schemaNCols(pSchema); pCols->numOfCols = schemaNCols(pSchema);
...@@ -337,11 +355,14 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { ...@@ -337,11 +355,14 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints); dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints);
ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize); ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize);
} }
return 0;
} }
void tdFreeDataCols(SDataCols *pCols) { void tdFreeDataCols(SDataCols *pCols) {
if (pCols) { if (pCols) {
tfree(pCols->buf); tfree(pCols->buf);
tfree(pCols->cols);
free(pCols); free(pCols);
} }
} }
......
...@@ -483,7 +483,7 @@ void tsdbDestroyHelper(SRWHelper* pHelper); ...@@ -483,7 +483,7 @@ void tsdbDestroyHelper(SRWHelper* pHelper);
void tsdbResetHelper(SRWHelper* pHelper); void tsdbResetHelper(SRWHelper* pHelper);
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError); int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError);
void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey); int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
int tsdbWriteCompInfo(SRWHelper* pHelper); int tsdbWriteCompInfo(SRWHelper* pHelper);
......
...@@ -788,7 +788,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { ...@@ -788,7 +788,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
for (int i = 1; i < pMeta->maxTables; i++) { for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue; if (pTable == NULL) continue;
tsdbSetHelperTable(&rhelper, pTable, pRepo); if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err;
SCompIdx *pIdx = &(rhelper.curCompIdx); SCompIdx *pIdx = &(rhelper.curCompIdx);
if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey; if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey;
......
...@@ -610,10 +610,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ...@@ -610,10 +610,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
taosRLockLatch(&(pIter->pTable->latch)); taosRLockLatch(&(pIter->pTable->latch));
tsdbSetHelperTable(pHelper, pIter->pTable, pRepo); if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
if (pIter->pIter != NULL) { if (pIter->pIter != NULL) {
tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)); if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch)); taosRUnLockLatch(&(pIter->pTable->latch));
......
...@@ -262,7 +262,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -262,7 +262,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
return 0; return 0;
} }
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { int tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD)); ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD));
// Clear members and state used by previous table // Clear members and state used by previous table
...@@ -273,8 +273,15 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { ...@@ -273,8 +273,15 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
pHelper->tableInfo.uid = pTable->tableId.uid; pHelper->tableInfo.uid = pTable->tableId.uid;
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
tdInitDataCols(pHelper->pDataCols[0], pSchema); if (tdInitDataCols(pHelper->pDataCols[0], pSchema) < 0) {
tdInitDataCols(pHelper->pDataCols[1], pSchema); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (tdInitDataCols(pHelper->pDataCols[1], pSchema) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (pHelper->idxH.numOfIdx > 0) { if (pHelper->idxH.numOfIdx > 0) {
while (true) { while (true) {
...@@ -309,6 +316,8 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { ...@@ -309,6 +316,8 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
helperSetState(pHelper, TSDB_HELPER_TABLE_SET); helperSetState(pHelper, TSDB_HELPER_TABLE_SET);
ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1)); ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1));
return 0;
} }
int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册