From 48249f74afc392e85c5acbc5e1bebdec0147d217 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Tue, 28 Apr 2020 10:11:22 +0800 Subject: [PATCH] TD-166 --- src/common/inc/tdataformat.h | 30 +++++++++++ src/common/src/tdataformat.c | 100 +++++++++++++++++++---------------- 2 files changed, 85 insertions(+), 45 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index c938c1cfb1..7347782b89 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -20,6 +20,7 @@ #include #include "taosdef.h" +#include "tutil.h" #ifdef __cplusplus extern "C" { @@ -96,6 +97,18 @@ int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, i void tdDataRowReset(SDataRow row, STSchema *pSchema); SDataRow tdDataRowDup(SDataRow row); +static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) { + switch (type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + return dataRowAt(row, *(int32_t *)dataRowAt(row, offset)); + break; + default: + return row + offset; + break; + } +} + // ----------------- Data column structure typedef struct SDataCol { int8_t type; @@ -106,6 +119,23 @@ typedef struct SDataCol { void * pData; // Original data } SDataCol; +void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints); + +// Get the data pointer from a column-wised data +static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { + switch (pCol->type) + { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + return pCol->pData + ((int32_t *)(pCol->pData))[row]; + break; + + default: + return pCol->pData + TYPE_BYTES[pCol->type] * row; + break; + } +} + typedef struct { int maxRowSize; int maxCols; // max number of columns diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index bd3557cb44..8d12a6e43b 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ #include "tdataformat.h" -#include "tutil.h" #include "wchar.h" /** @@ -142,7 +141,7 @@ STSchema *tdDecodeSchema(void **psrc) { */ void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); } -SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { +SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { int32_t size = dataRowMaxBytesFromSchema(pSchema); SDataRow row = malloc(size); @@ -150,7 +149,7 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { tdInitDataRow(row, pSchema); return row; - } +} /** * Free the SDataRow object @@ -185,7 +184,7 @@ int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_ slen = strnlen((char *)value, bytes); } else { slen = wcsnlen((wchar_t *)value, (bytes) / TSDB_NCHAR_SIZE) * TSDB_NCHAR_SIZE; - } + } } ASSERT(slen <= bytes); @@ -214,6 +213,28 @@ SDataRow tdDataRowDup(SDataRow row) { return trow; } +void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints) { + ASSERT(pCol != NULL && value != NULL); + + switch (pCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + if (pCol->len == 0) pCol->len = sizeof(int32_t) * maxPoints; + // set offset + ((int32_t *)(pCol->pData))[numOfPoints] = pCol->len; + // Copy data + memcpy(pCol->pData + pCol->len, value, sizeof(int16_t) + *(int16_t *)value); + // Update the length + pCol->len += (sizeof(int16_t) + *(int16_t *)value); + break; + default: + ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints); + memcpy(pCol->pData + pCol->len, value, pCol->bytes); + pCol->len += pCol->bytes; + break; + } +} + SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows, int exColBytes) { SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols); if (pCols == NULL) return NULL; @@ -293,35 +314,13 @@ void tdResetDataCols(SDataCols *pCols) { } void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { + ASSERT(dataColsKeyLast(pCols) < dataRowKey(row)); + for (int i = 0; i < pCols->numOfCols; i++) { SDataCol *pCol = pCols->cols + i; - void *ptr = NULL; - int32_t toffset = 0; + void * value = tdGetRowDataOfCol(row, pCol->type, pCol->offset); - switch (pCol->type) - { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - if (pCols->numOfPoints == 0) pCol->len = sizeof(int32_t) * pCols->maxPoints; - - // set offset - ((int32_t *)(pCol->pData))[pCols->numOfPoints] = pCol->len; - - // copy data - toffset = *(int32_t *)dataRowAt(row, pCol->offset); - ptr = dataRowAt(row, toffset); - memcpy(pCol->pData + pCol->len, ptr, *(int16_t *)ptr + sizeof(int16_t)); - // update length - pCol->len += *(int16_t *)ptr + sizeof(int16_t); - break; - default: - ASSERT(pCol->len == TYPE_BYTES[pCol->type] * pCols->numOfPoints); - // copy data - memcpy(pCol->pData + pCol->len, dataRowAt(row, pCol->offset), pCol->bytes); - // update length - pCol->len += pCol->bytes; - break; - } + dataColAppendVal(pCol, value, pCols->numOfPoints, pCols->maxPoints); } pCols->numOfPoints++; } @@ -336,7 +335,7 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { int32_t offsetSize = sizeof(int32_t) * pCols->maxPoints; int32_t toffset = 0; - int tlen = 0; + int tlen = 0; for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { SDataCol *pCol = pCols->cols + iCol; ASSERT(pCol->len > 0); @@ -371,14 +370,27 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints); + ASSERT(target->numOfPoints + rowsToMerge <= target->maxPoints); + ASSERT(target->numOfCols == source->numOfCols); - SDataCols *pTarget = tdDupDataCols(target, true); - if (pTarget == NULL) goto _err; - // tdResetDataCols(target); + SDataCols *pTarget = NULL; - int iter1 = 0; - int iter2 = 0; - tdMergeTwoDataCols(target,pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge); + if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap + for (int i = 0; i < rowsToMerge; i++) { + for (int j = 0; j < source->numOfCols; j++) { + dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfPoints, + target->maxPoints); + } + } + target->numOfPoints++; + } else { + pTarget = tdDupDataCols(target, true); + if (pTarget == NULL) goto _err; + + int iter1 = 0; + int iter2 = 0; + tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge); + } tdFreeDataCols(pTarget); return 0; @@ -389,6 +401,7 @@ _err: } void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) { + // TODO: add resolve duplicate key here tdResetDataCols(target); while (target->numOfPoints < tRows) { @@ -400,10 +413,8 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol if (key1 < key2) { for (int i = 0; i < src1->numOfCols; i++) { ASSERT(target->cols[i].type == src1->cols[i].type); - memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), - (void *)((char *)(src1->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * (*iter1)), - TYPE_BYTES[target->cols[i].type]); - target->cols[i].len += TYPE_BYTES[target->cols[i].type]; + dataColAppendVal(target->cols[i].pData, tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfPoints, + target->maxPoints); } target->numOfPoints++; @@ -411,15 +422,14 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol } else if (key1 > key2) { for (int i = 0; i < src2->numOfCols; i++) { ASSERT(target->cols[i].type == src2->cols[i].type); - memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), - (void *)((char *)(src2->cols[i].pData) + TYPE_BYTES[src2->cols[i].type] * (*iter2)), - TYPE_BYTES[target->cols[i].type]); - target->cols[i].len += TYPE_BYTES[target->cols[i].type]; + dataColAppendVal(target->cols[i].pData, tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfPoints, + target->maxPoints); } target->numOfPoints++; (*iter2)++; } else { + // TODO: deal with duplicate keys ASSERT(false); } } -- GitLab