diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 1f46c68abcffa57c246c561b4c64b931d3d5c7ea..207ff4dbfd1aa1f34b32ef93b7f819651d07f7c8 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -105,11 +105,25 @@ SDataRow tdDataRowDup(SDataRow row); // ----------------- Data column structure typedef struct SDataCol { - int64_t len; - char data[]; + int32_t len; + void * pData; } SDataCol; -void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter); +typedef struct { + TSKEY firstKey; + TSKEY lastKey; + int numOfPoints; + int numOfCols; + void * buf; + SDataCol cols[]; +} SDataCols; + +#define keyCol(cols) (&((cols)->cols[0])) // Key column + +SDataCols *tdNewDataCols(STSchema *pSchema, int nRows); +void tdFreeDataCols(SDataCols *pCols); +void tdResetDataCols(SDataCols *pCols); +void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols, STSchema *pSchema); #ifdef __cplusplus } diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 04826e43ac98852495b5951aaa0ba66315bf2480..3c692f9eba8e3528b8238f729051210c24e2d055 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -294,14 +294,55 @@ SDataRow tdDataRowDup(SDataRow row) { return trow; } -void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter) { - int row = *iter; +SDataCols *tdNewDataCols(STSchema *pSchema, int nRows) { + int nCols = schemaNCols(pSchema); + + SDataCols *pInfo = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * nCols); + if (pInfo == NULL) return NULL; + + pInfo->numOfCols = nCols; + pInfo->firstKey = INT64_MIN; + pInfo->lastKey = INT64_MAX; + pInfo->buf = malloc(tdMaxRowBytesFromSchema(pSchema) * nRows); + if (pInfo->buf == NULL) { + free(pInfo); + return NULL; + } - for (int i = 0; i < schemaNCols(pSchema); i++) { - // TODO + pInfo->cols[0].pData = pInfo->buf; + for (int i = 1; i < nCols; i++) { + pInfo->cols[i].pData = (char *)(pInfo->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * nRows; } - *iter = row + 1; + return pInfo; +} + +void tdFreeDataCols(SDataCols *pCols) { + if (pCols) { + if (pCols->buf) free(pCols->buf); + free(pCols); + } +} + +void tdResetDataCols(SDataCols *pCols) { + pCols->firstKey = INT64_MAX; + pCols->lastKey = INT64_MIN; + pCols->numOfPoints = 0; + for (int i = 0; i < pCols->numOfCols; i++) { + pCols->cols[i].len = 0; + } +} + +void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols, STSchema *pSchema) { + TSKEY key = dataRowKey(row); + if (pCols->numOfPoints == 0) pCols->firstKey = key; + pCols->lastKey = key; + for (int i = 0; i < pCols->numOfCols; i++) { + SDataCol *pCol = pCols->cols + i; + memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, colOffset(schemaColAt(pSchema, i))), + colBytes(schemaColAt(pSchema, i))); + pCol->len += colBytes(schemaColAt(pSchema, i)); + } } /** diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index af3a923d904d62c5fc3af8d8f4ca8dfad64c7282..7b1836378d18ffb7cc538bb3fd251c100992a893 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -84,7 +84,7 @@ static int tsdbOpenMetaFile(char *tsdbDir); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); -static void * tsdbCommitToFile(void *arg); +static void * tsdbCommitData(void *arg); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -327,7 +327,7 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { pRepo->tsdbCache->curBlock = NULL; // TODO: here should set as detached or use join for memory leak - pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); + pthread_create(&(pRepo->commitThread), NULL, tsdbCommitData, (void *)repo); tsdbUnLockRepo(repo); return 0; @@ -816,7 +816,7 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) } // Commit to file -static void *tsdbCommitToFile(void *arg) { +static void *tsdbCommitData(void *arg) { // TODO printf("Starting to commit....\n"); STsdbRepo * pRepo = (STsdbRepo *)arg; @@ -894,4 +894,26 @@ static void *tsdbCommitToFile(void *arg) { tsdbUnLockRepo(arg); return NULL; +} + +static int tsdbCommitToFile(STsdbRepo *pRepo, SSkipListIterator **iters, int fid) { + STsdbMeta * pMeta = pRepo->tsdbMeta; + STsdbFileH *pFileH = pRepo->tsdbFileH; + STsdbCfg * pCfg = &pRepo->config; + TSKEY minKey = 0, maxKey = 0; + tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + + for (int tid = 0; tid < pCfg->maxTables; tid++) { + STable *pTable = pMeta->tables[tid]; + SSkipListIterator *pIter = iters[tid]; + + if (pIter == NULL) continue; + + // Read data + // while () { + + // } + } + + return 0; } \ No newline at end of file