提交 48249f74 编写于 作者: H hzcheng

TD-166

上级 a9a386c6
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <string.h> #include <string.h>
#include "taosdef.h" #include "taosdef.h"
#include "tutil.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -96,6 +97,18 @@ int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, i ...@@ -96,6 +97,18 @@ int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, i
void tdDataRowReset(SDataRow row, STSchema *pSchema); void tdDataRowReset(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row); SDataRow tdDataRowDup(SDataRow row);
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) {
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
return dataRowAt(row, *(int32_t *)dataRowAt(row, offset));
break;
default:
return row + offset;
break;
}
}
// ----------------- Data column structure // ----------------- Data column structure
typedef struct SDataCol { typedef struct SDataCol {
int8_t type; int8_t type;
...@@ -106,6 +119,23 @@ typedef struct SDataCol { ...@@ -106,6 +119,23 @@ typedef struct SDataCol {
void * pData; // Original data void * pData; // Original data
} SDataCol; } SDataCol;
void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints);
// Get the data pointer from a column-wised data
static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) {
switch (pCol->type)
{
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
return pCol->pData + ((int32_t *)(pCol->pData))[row];
break;
default:
return pCol->pData + TYPE_BYTES[pCol->type] * row;
break;
}
}
typedef struct { typedef struct {
int maxRowSize; int maxRowSize;
int maxCols; // max number of columns int maxCols; // max number of columns
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tdataformat.h" #include "tdataformat.h"
#include "tutil.h"
#include "wchar.h" #include "wchar.h"
/** /**
...@@ -142,7 +141,7 @@ STSchema *tdDecodeSchema(void **psrc) { ...@@ -142,7 +141,7 @@ STSchema *tdDecodeSchema(void **psrc) {
*/ */
void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); } void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); }
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
int32_t size = dataRowMaxBytesFromSchema(pSchema); int32_t size = dataRowMaxBytesFromSchema(pSchema);
SDataRow row = malloc(size); SDataRow row = malloc(size);
...@@ -150,7 +149,7 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { ...@@ -150,7 +149,7 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
tdInitDataRow(row, pSchema); tdInitDataRow(row, pSchema);
return row; return row;
} }
/** /**
* Free the SDataRow object * Free the SDataRow object
...@@ -185,7 +184,7 @@ int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_ ...@@ -185,7 +184,7 @@ int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_
slen = strnlen((char *)value, bytes); slen = strnlen((char *)value, bytes);
} else { } else {
slen = wcsnlen((wchar_t *)value, (bytes) / TSDB_NCHAR_SIZE) * TSDB_NCHAR_SIZE; slen = wcsnlen((wchar_t *)value, (bytes) / TSDB_NCHAR_SIZE) * TSDB_NCHAR_SIZE;
} }
} }
ASSERT(slen <= bytes); ASSERT(slen <= bytes);
...@@ -214,6 +213,28 @@ SDataRow tdDataRowDup(SDataRow row) { ...@@ -214,6 +213,28 @@ SDataRow tdDataRowDup(SDataRow row) {
return trow; return trow;
} }
void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints) {
ASSERT(pCol != NULL && value != NULL);
switch (pCol->type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (pCol->len == 0) pCol->len = sizeof(int32_t) * maxPoints;
// set offset
((int32_t *)(pCol->pData))[numOfPoints] = pCol->len;
// Copy data
memcpy(pCol->pData + pCol->len, value, sizeof(int16_t) + *(int16_t *)value);
// Update the length
pCol->len += (sizeof(int16_t) + *(int16_t *)value);
break;
default:
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints);
memcpy(pCol->pData + pCol->len, value, pCol->bytes);
pCol->len += pCol->bytes;
break;
}
}
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows, int exColBytes) { SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows, int exColBytes) {
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols); SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
if (pCols == NULL) return NULL; if (pCols == NULL) return NULL;
...@@ -293,35 +314,13 @@ void tdResetDataCols(SDataCols *pCols) { ...@@ -293,35 +314,13 @@ void tdResetDataCols(SDataCols *pCols) {
} }
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));
for (int i = 0; i < pCols->numOfCols; i++) { for (int i = 0; i < pCols->numOfCols; i++) {
SDataCol *pCol = pCols->cols + i; SDataCol *pCol = pCols->cols + i;
void *ptr = NULL; void * value = tdGetRowDataOfCol(row, pCol->type, pCol->offset);
int32_t toffset = 0;
switch (pCol->type) dataColAppendVal(pCol, value, pCols->numOfPoints, pCols->maxPoints);
{
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (pCols->numOfPoints == 0) pCol->len = sizeof(int32_t) * pCols->maxPoints;
// set offset
((int32_t *)(pCol->pData))[pCols->numOfPoints] = pCol->len;
// copy data
toffset = *(int32_t *)dataRowAt(row, pCol->offset);
ptr = dataRowAt(row, toffset);
memcpy(pCol->pData + pCol->len, ptr, *(int16_t *)ptr + sizeof(int16_t));
// update length
pCol->len += *(int16_t *)ptr + sizeof(int16_t);
break;
default:
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * pCols->numOfPoints);
// copy data
memcpy(pCol->pData + pCol->len, dataRowAt(row, pCol->offset), pCol->bytes);
// update length
pCol->len += pCol->bytes;
break;
}
} }
pCols->numOfPoints++; pCols->numOfPoints++;
} }
...@@ -336,7 +335,7 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { ...@@ -336,7 +335,7 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
int32_t offsetSize = sizeof(int32_t) * pCols->maxPoints; int32_t offsetSize = sizeof(int32_t) * pCols->maxPoints;
int32_t toffset = 0; int32_t toffset = 0;
int tlen = 0; int tlen = 0;
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SDataCol *pCol = pCols->cols + iCol; SDataCol *pCol = pCols->cols + iCol;
ASSERT(pCol->len > 0); ASSERT(pCol->len > 0);
...@@ -371,14 +370,27 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { ...@@ -371,14 +370,27 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints); ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints);
ASSERT(target->numOfPoints + rowsToMerge <= target->maxPoints);
ASSERT(target->numOfCols == source->numOfCols);
SDataCols *pTarget = tdDupDataCols(target, true); SDataCols *pTarget = NULL;
if (pTarget == NULL) goto _err;
// tdResetDataCols(target);
int iter1 = 0; if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap
int iter2 = 0; for (int i = 0; i < rowsToMerge; i++) {
tdMergeTwoDataCols(target,pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge); for (int j = 0; j < source->numOfCols; j++) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfPoints,
target->maxPoints);
}
}
target->numOfPoints++;
} else {
pTarget = tdDupDataCols(target, true);
if (pTarget == NULL) goto _err;
int iter1 = 0;
int iter2 = 0;
tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge);
}
tdFreeDataCols(pTarget); tdFreeDataCols(pTarget);
return 0; return 0;
...@@ -389,6 +401,7 @@ _err: ...@@ -389,6 +401,7 @@ _err:
} }
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) { void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) {
// TODO: add resolve duplicate key here
tdResetDataCols(target); tdResetDataCols(target);
while (target->numOfPoints < tRows) { while (target->numOfPoints < tRows) {
...@@ -400,10 +413,8 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol ...@@ -400,10 +413,8 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol
if (key1 < key2) { if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) { for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type); ASSERT(target->cols[i].type == src1->cols[i].type);
memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), dataColAppendVal(target->cols[i].pData, tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfPoints,
(void *)((char *)(src1->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * (*iter1)), target->maxPoints);
TYPE_BYTES[target->cols[i].type]);
target->cols[i].len += TYPE_BYTES[target->cols[i].type];
} }
target->numOfPoints++; target->numOfPoints++;
...@@ -411,15 +422,14 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol ...@@ -411,15 +422,14 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol
} else if (key1 > key2) { } else if (key1 > key2) {
for (int i = 0; i < src2->numOfCols; i++) { for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type); ASSERT(target->cols[i].type == src2->cols[i].type);
memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), dataColAppendVal(target->cols[i].pData, tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfPoints,
(void *)((char *)(src2->cols[i].pData) + TYPE_BYTES[src2->cols[i].type] * (*iter2)), target->maxPoints);
TYPE_BYTES[target->cols[i].type]);
target->cols[i].len += TYPE_BYTES[target->cols[i].type];
} }
target->numOfPoints++; target->numOfPoints++;
(*iter2)++; (*iter2)++;
} else { } else {
// TODO: deal with duplicate keys
ASSERT(false); ASSERT(false);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册