提交 4900f23a 编写于 作者: H hzcheng

add more code

上级 0566dfbd
......@@ -58,10 +58,24 @@ void tdDataRowReset(SDataRow row);
*/
typedef void *SDataRows;
#define TD_DATA_ROWS_HEAD_LEN sizeof(int32_t)
#define dataRowsLen(rs) (*(int32_t *)(rs))
#define dataRowsSetLen(rs, l) (dataRowsLen(rs) = (l))
#define dataRowsInit(rs) dataRowsSetLen(rs, sizeof(int32_t))
void tdDataRowsAppendRow(SDataRows rows, SDataRow row);
// Data rows iterator
typedef struct {
int32_t totalLen;
int32_t len;
SDataRow row;
} SDataRowsIter;
void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter);
SDataRow tdDataRowsNext(SDataRowsIter *pIter);
/* Data column definition
* +---------+---------+-----------------------+
* | int32_t | int32_t | |
......@@ -80,12 +94,6 @@ typedef char *SDataCol;
*/
typedef char *SDataCols;
typedef struct {
int32_t rowCounter;
int32_t totalRows;
SDataRow row;
} SDataRowsIter;
// ----------------- Data column structure
// ---- operation on SDataRow;
......@@ -110,11 +118,6 @@ void tdFreeSDataRow(SDataRow rdata);
#define TD_DATACOLS_LEN(pDataCols) (*(int32_t *)(pDataCols))
#define TD_DATACOLS_NPOINTS(pDataCols) (*(int32_t *)(pDataCols + sizeof(int32_t)))
// ---- operation on SDataRowIter
void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter);
int32_t tdRdataIterEnd(SDataRowsIter *pIter);
void tdRdataIterNext(SDataRowsIter *pIter);
#ifdef __cplusplus
}
#endif
......
......@@ -58,7 +58,7 @@ int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixO
case TD_DATATYPE_DOUBLE:
case TD_DATATYPE_TIMESTAMP:
memcpy(dataRowIdx(row, pCol->offset + sizeof(int32_t)), value, rowDataLen[pCol->type]);
if (dataRowLen(row) > suffixOffset + sizeof(int32_t))
if (dataRowLen(row) < suffixOffset + sizeof(int32_t))
dataRowSetLen(row, dataRowLen(row) + rowDataLen[pCol->type]);
break;
case TD_DATATYPE_VARCHAR:
......@@ -81,26 +81,47 @@ int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixO
void tdDataRowCpy(void *dst, SDataRow row) { memcpy(dst, row, dataRowLen(row)); }
void tdDataRowReset(SDataRow row) { dataRowSetLen(row, sizeof(int32_t)); }
// ------ Codes below should be refactored
SDataRow tdSDataRowDup(SDataRow rdata) { return NULL; }
void tdFreeSDataRow(SDataRow rdata) {
if (rdata == NULL) return;
free(rdata);
void tdDataRowsAppendRow(SDataRows rows, SDataRow row) {
tdDataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row);
dataRowsSetLen(rows, dataRowsLen(rows) + dataRowLen(row));
}
// Initialize the iterator
void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter) {
pIter->totalRows = TD_DATAROWS_ROWS(rows);
pIter->rowCounter = 1;
pIter->row = TD_DATAROWS_DATA(rows);
if (pIter == NULL) return;
pIter->totalLen = dataRowsLen(rows);
if (pIter->totalLen == TD_DATA_ROWS_HEAD_LEN) {
pIter->row = NULL;
return;
}
pIter->row = (SDataRow)((char *)rows + TD_DATA_ROWS_HEAD_LEN);
pIter->len = TD_DATA_ROWS_HEAD_LEN + dataRowLen(pIter->row);
}
void tdRdataIterNext(SDataRowsIter *pIter) {
pIter->rowCounter++;
pIter->row = pIter->row + TD_DATAROW_LEN(pIter->row);
// Get the next row in Rows
SDataRow tdDataRowsNext(SDataRowsIter *pIter) {
SDataRow row = pIter->row;
if (row == NULL) return NULL;
if (pIter->len >= pIter->totalLen) {
pIter->row = NULL;
} else {
pIter->row = (char *)row + dataRowLen(row);
pIter->len += dataRowLen(row);
}
return row;
}
int32_t tdRdataIterEnd(SDataRowsIter *pIter) { return pIter->rowCounter >= pIter->totalRows; }
// ------ Codes below should be refactored
SDataRow tdSDataRowDup(SDataRow rdata) { return NULL; }
void tdFreeSDataRow(SDataRow rdata) {
if (rdata == NULL) return;
free(rdata);
}
/**
* Copy it
......
......@@ -30,5 +30,8 @@ int32_t tsdbFreeCache(STsdbCache *pHandle) { return 0; }
void *tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes) {
// TODO: implement here
return NULL;
void *ptr = malloc(bytes);
if (ptr == NULL) return NULL;
return ptr;
}
\ No newline at end of file
......@@ -409,14 +409,16 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
int32_t level = 0;
int32_t headSize = 0;
tSkipListRandNodeInfo(pTable->content.pData, &level, &headSize);
// Copy row into the memory
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + TD_DATAROW_LEN(row));
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row));
if (pNode == NULL) {
// TODO: deal with allocate failure
}
pNode->level = level;
tdSDataRowCpy(row, SL_GET_NODE_DATA(pNode));
tdDataRowCpy(SL_GET_NODE_DATA(pNode), row);
// Insert the skiplist node into the data
tsdbInsertRowToTableImpl(pNode, pTable);
......@@ -434,14 +436,14 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock) {
SDataRows rows = pBlock->data;
SDataRowsIter rDataIter, *pIter;
pIter = &rDataIter;
SDataRow row;
tdInitSDataRowsIter(rows, pIter);
while (!tdRdataIterEnd(pIter)) {
if (tdInsertRowToTable(pRepo, pIter->row, pTable) < 0) {
while ((row = tdDataRowsNext(pIter)) != NULL) {
if (tdInsertRowToTable(pRepo, row, pTable) < 0) {
// TODO: deal with the error here
}
tdRdataIterNext(pIter);
}
return 0;
......
......@@ -4,7 +4,7 @@
#include "tsdb.h"
#include "tsdbMeta.h"
TEST(TsdbTest, createTable) {
TEST(TsdbTest, DISABLED_createTable) {
STsdbMeta *pMeta = tsdbCreateMeta(100);
ASSERT_NE(pMeta, nullptr);
......@@ -54,6 +54,8 @@ TEST(TsdbTest, createRepo) {
int32_t size = sizeof(SSubmitMsg) + sizeof(SSubmitBlock) + tdMaxRowDataBytes(config.schema) * 10 + sizeof(int32_t);
tdUpdateSchema(config.schema);
SSubmitMsg *pMsg = (SSubmitMsg *)malloc(size);
pMsg->numOfTables = 1; // TODO: use api
......@@ -67,21 +69,19 @@ TEST(TsdbTest, createRepo) {
SDataRow row = tdNewDataRow(tdMaxRowDataBytes(config.schema));
int64_t ttime = 1583508800000;
void *pDst = pBlock->data;
for (int i = 0; i < 10; i++) { // loop over rows
ttime += (10000 * i);
tdDataRowReset(row);
for (int j = 0; j < schemaNCols(config.schema); j++) {
if (j == 0) { // set time stamp
tdAppendColVal(row, (void *)(&ttime), schemaColAt(config.schema, j), 24);
tdAppendColVal(row, (void *)(&ttime), schemaColAt(config.schema, j), 40);
} else { // set other fields
int val = 10;
tdAppendColVal(row, (void *)(&val), schemaColAt(config.schema, j), 24);
int32_t val = 10;
tdAppendColVal(row, (void *)(&val), schemaColAt(config.schema, j), 40);
}
}
dataRowCpy((void *)pDst, row);
pDst += dataRowLen(row);
tdDataRowsAppendRow(rows, row);
}
tsdbInsertData(pRepo, pMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册