提交 7e5b12f7 编写于 作者: H hzcheng

TD-34

上级 922134aa
...@@ -105,11 +105,25 @@ SDataRow tdDataRowDup(SDataRow row); ...@@ -105,11 +105,25 @@ SDataRow tdDataRowDup(SDataRow row);
// ----------------- Data column structure // ----------------- Data column structure
typedef struct SDataCol { typedef struct SDataCol {
int64_t len; int32_t len;
char data[]; void * pData;
} SDataCol; } SDataCol;
void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter); typedef struct {
TSKEY firstKey;
TSKEY lastKey;
int numOfPoints;
int numOfCols;
void * buf;
SDataCol cols[];
} SDataCols;
#define keyCol(cols) (&((cols)->cols[0])) // Key column
SDataCols *tdNewDataCols(STSchema *pSchema, int nRows);
void tdFreeDataCols(SDataCols *pCols);
void tdResetDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols, STSchema *pSchema);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -294,14 +294,55 @@ SDataRow tdDataRowDup(SDataRow row) { ...@@ -294,14 +294,55 @@ SDataRow tdDataRowDup(SDataRow row) {
return trow; return trow;
} }
void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter) { SDataCols *tdNewDataCols(STSchema *pSchema, int nRows) {
int row = *iter; int nCols = schemaNCols(pSchema);
SDataCols *pInfo = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * nCols);
if (pInfo == NULL) return NULL;
pInfo->numOfCols = nCols;
pInfo->firstKey = INT64_MIN;
pInfo->lastKey = INT64_MAX;
pInfo->buf = malloc(tdMaxRowBytesFromSchema(pSchema) * nRows);
if (pInfo->buf == NULL) {
free(pInfo);
return NULL;
}
for (int i = 0; i < schemaNCols(pSchema); i++) { pInfo->cols[0].pData = pInfo->buf;
// TODO for (int i = 1; i < nCols; i++) {
pInfo->cols[i].pData = (char *)(pInfo->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * nRows;
}
return pInfo;
}
void tdFreeDataCols(SDataCols *pCols) {
if (pCols) {
if (pCols->buf) free(pCols->buf);
free(pCols);
}
}
void tdResetDataCols(SDataCols *pCols) {
pCols->firstKey = INT64_MAX;
pCols->lastKey = INT64_MIN;
pCols->numOfPoints = 0;
for (int i = 0; i < pCols->numOfCols; i++) {
pCols->cols[i].len = 0;
} }
}
*iter = row + 1; void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols, STSchema *pSchema) {
TSKEY key = dataRowKey(row);
if (pCols->numOfPoints == 0) pCols->firstKey = key;
pCols->lastKey = key;
for (int i = 0; i < pCols->numOfCols; i++) {
SDataCol *pCol = pCols->cols + i;
memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, colOffset(schemaColAt(pSchema, i))),
colBytes(schemaColAt(pSchema, i)));
pCol->len += colBytes(schemaColAt(pSchema, i));
}
} }
/** /**
......
...@@ -84,7 +84,7 @@ static int tsdbOpenMetaFile(char *tsdbDir); ...@@ -84,7 +84,7 @@ static int tsdbOpenMetaFile(char *tsdbDir);
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname);
static void * tsdbCommitToFile(void *arg); static void * tsdbCommitData(void *arg);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name) #define TSDB_GET_TABLE_BY_NAME(pRepo, name)
...@@ -327,7 +327,7 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { ...@@ -327,7 +327,7 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
pRepo->tsdbCache->curBlock = NULL; pRepo->tsdbCache->curBlock = NULL;
// TODO: here should set as detached or use join for memory leak // TODO: here should set as detached or use join for memory leak
pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); pthread_create(&(pRepo->commitThread), NULL, tsdbCommitData, (void *)repo);
tsdbUnLockRepo(repo); tsdbUnLockRepo(repo);
return 0; return 0;
...@@ -816,7 +816,7 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) ...@@ -816,7 +816,7 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables)
} }
// Commit to file // Commit to file
static void *tsdbCommitToFile(void *arg) { static void *tsdbCommitData(void *arg) {
// TODO // TODO
printf("Starting to commit....\n"); printf("Starting to commit....\n");
STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbRepo * pRepo = (STsdbRepo *)arg;
...@@ -895,3 +895,25 @@ static void *tsdbCommitToFile(void *arg) { ...@@ -895,3 +895,25 @@ static void *tsdbCommitToFile(void *arg) {
return NULL; return NULL;
} }
static int tsdbCommitToFile(STsdbRepo *pRepo, SSkipListIterator **iters, int fid) {
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
STsdbCfg * pCfg = &pRepo->config;
TSKEY minKey = 0, maxKey = 0;
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
for (int tid = 0; tid < pCfg->maxTables; tid++) {
STable *pTable = pMeta->tables[tid];
SSkipListIterator *pIter = iters[tid];
if (pIter == NULL) continue;
// Read data
// while () {
// }
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册