diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 9470b07b9acb0c231cdf4ced5a9183ce58125d51..c0b15c6d4a19e6a80efbb53337d3a01be108d3d0 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -242,7 +242,7 @@ static void tTupleTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow) { } } -static void tMapTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow) { +static void tMapTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow, uint8_t flags) { int32_t nColVal = taosArrayGetSize(pArray); STColumn *pTColumn; SColVal *pColVal; @@ -250,6 +250,7 @@ static void tMapTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow) { ASSERT(nColVal > 0); pRow->sver = pTSchema->version; + pRow->flags = 0; // ts pTColumn = &pTSchema->columns[0]; @@ -265,7 +266,6 @@ static void tMapTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow) { uint32_t nv = 0; uint8_t *pv = NULL; uint8_t *pidx = NULL; - uint8_t flags = 0; int16_t nCol = 0; for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { pTColumn = &pTSchema->columns[iColumn]; @@ -296,29 +296,208 @@ static void tMapTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow) { } _set_none: - flags |= TSROW_HAS_NONE; + pRow->flags |= TSROW_HAS_NONE; continue; _set_null: - flags != TSROW_HAS_NULL; - pidx[nCol++] = nv; - // nv += tPutColVal(pv ? pv + nv : pv, pColVal, pTColumn->type, 0); + pRow->flags |= TSROW_HAS_NULL; + if (pidx) pidx[nCol++] = nv; + nv += tPutI16v(pv ? pv + nv : pv, -pTColumn->colId); continue; _set_value: - flags != TSROW_HAS_VAL; - pidx[nCol++] = nv; - // nv += tPutColVal(pv ? pv + nv : pv, pColVal, pTColumn->type, 0); + pRow->flags != TSROW_HAS_VAL; + if (pidx) pidx[nCol++] = nv; + nv += tPutI16v(pv ? pv + nv : pv, pTColumn->colId); + nv += tPutValue(pv ? pv + nv : pv, &pColVal->value, pTColumn->type); continue; } if (nv <= UINT8_MAX) { + pRow->flags |= TSROW_KV_SMALL; // small } else if (nv <= UINT16_MAX) { + pRow->flags |= TSROW_KV_MID; // mid } else { + pRow->flags |= TSROW_KV_BIG; // large } + + ASSERT(flags == 0 || pRow->flags == flags); +} + +static void tTSRowNewImpl(SArray *pArray, STSchema *pTSchema, STSRow2 *pRowT, STSRow2 *pRowK) { + int32_t nColVal = taosArrayGetSize(pArray); + STColumn *pTColumn; + SColVal *pColVal; + uint8_t tflags = 0; + uint8_t kflags = 0; + + ASSERT(nColVal > 0); + + // prepare + uint8_t *pb = NULL; + uint8_t *pf = NULL; + uint8_t *ptv = NULL; + uint32_t ntv = 0; + int8_t isBit1 = 0; + + STSKVRow kvRow = {0}; + STSKVRow *pTSKVRow = &kvRow; + uint8_t *pidx = NULL; + uint8_t *pkv = NULL; + uint32_t nkv = 0; + uint32_t maxIdx = 0; + + if (pRowT) { + tflags = pRowT->flags; + + pRowT->flags = 0; + pRowT->sver = pTSchema->version; + pRowT->nData = 0; + + if (tflags) { // build + } else { // try + } + } + if (pRowK) { + kflags = pRowK->flags; + + pRowK->flags = 0; + pRowK->sver = pTSchema->version; + pRowK->nData = 0; + + if (kflags) { // build + } else { // try + } + } + + // ts + pTColumn = &pTSchema->columns[0]; + pColVal = (SColVal *)taosArrayGet(pArray, 0); + + ASSERT(pTColumn->colId == 0 && pColVal->cid == 0); + ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); + + if (pRowT) { + pRowT->ts = pColVal->value.ts; + } + if (pRowK) { + pRowK->ts = pColVal->value.ts; + } + + // other + int32_t iColVal = 1; + for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { + pTColumn = &pTSchema->columns[iColumn]; + if (iColVal < nColVal) { + pColVal = (SColVal *)taosArrayGet(pArray, iColVal); + } else { + pColVal = NULL; + } + + if (pColVal) { + if (pColVal->cid == pTColumn->colId) { + iColVal++; + if (pColVal->isNone) { + goto _set_none; + } else if (pColVal->isNull) { + goto _set_null; + } else { + goto _set_value; + } + } else if (pColVal->cid > pTColumn->colId) { + goto _set_none; + } else { + ASSERT(0); + } + } else { + goto _set_none; + } + + _set_none: + if (pRowT) { + pRowT->flags |= TSROW_HAS_NONE; + // TODO + } + if (pRowK) { + pRowK->flags |= TSROW_HAS_NONE; + } + continue; + + _set_null: + if (pRowT) { + pRowT->flags |= TSROW_HAS_NULL; + // TODO + } + if (pRowK) { + pRowK->flags |= TSROW_HAS_NULL; + + if (kflags) { + if (kflags & TSROW_KV_SMALL) { + ((uint8_t *)pidx)[pTSKVRow->nCols] = nkv; + } else if (kflags & TSROW_KV_MID) { + ((uint16_t *)pidx)[pTSKVRow->nCols] = nkv; + } else { + ((uint32_t *)pidx)[pTSKVRow->nCols] = nkv; + } + } + + maxIdx = nkv; + pTSKVRow->nCols++; + nkv += tPutI16v(pkv ? pkv + nkv : pkv, -pTColumn->colId); + } + continue; + + _set_value: + if (pRowT) { + pRowT->flags |= TSROW_HAS_VAL; + // TODO + } + if (pRowK) { + pRowK->flags |= TSROW_HAS_VAL; + + if (kflags) { + if (kflags & TSROW_KV_SMALL) { + ((uint8_t *)pidx)[pTSKVRow->nCols] = nkv; + } else if (kflags & TSROW_KV_MID) { + ((uint16_t *)pidx)[pTSKVRow->nCols] = nkv; + } else { + ((uint32_t *)pidx)[pTSKVRow->nCols] = nkv; + } + } + + maxIdx = nkv; + pTSKVRow->nCols++; + nkv += tPutI16v(pkv ? pkv + nkv : pkv, pTColumn->colId); + nkv += tPutValue(pkv ? pkv + nkv : pkv, &pColVal->value, pTColumn->type); + } + continue; + } + + // finalize (todo) + if (pRowT) { + } + if (pRowK) { + if (pTSKVRow->nCols == 0) { + pRowK->nData = 0; + pRowK->flags |= TSROW_KV_SMALL; + } else { + if (maxIdx <= UINT8_MAX) { + pRowK->flags |= TSROW_KV_SMALL; + pRowK->nData = sizeof(STSKVRow) + sizeof(uint8_t) * pTSKVRow->nCols; + } else if (maxIdx <= UINT16_MAX) { + pRowK->flags |= TSROW_KV_MID; + pRowK->nData = sizeof(STSKVRow) + sizeof(uint16_t) * pTSKVRow->nCols; + } else { + pRowK->flags |= TSROW_KV_BIG; + pRowK->nData = sizeof(STSKVRow) + sizeof(uint32_t) * pTSKVRow->nCols; + } + } + + ASSERT(kflags == 0 || pRowK->flags == kflags); + } } // try-decide-build @@ -329,13 +508,13 @@ int32_t tTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow) { // try tTupleTSRowNew(pArray, pTSchema, &rowT); - tMapTSRowNew(pArray, pTSchema, &rowM); + tMapTSRowNew(pArray, pTSchema, &rowM, 0); // decide & build if (rowT.nData <= rowM.nData) { tTupleTSRowNew(pArray, pTSchema, &rowT); } else { - tMapTSRowNew(pArray, pTSchema, &rowM); + tMapTSRowNew(pArray, pTSchema, &rowM, rowM.flags); } return code;