From 9ed5a42cf60b4eadb7fc7f8d6aca0a2c9ed6838a Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 7 Jul 2021 07:14:08 +0800 Subject: [PATCH] tsc raw data combination restructure --- src/client/inc/tscUtil.h | 31 +- src/client/inc/tsclient.h | 46 ++- src/client/src/tscParseInsert.c | 537 +++++++++++++++++++++++++++++--- src/client/src/tscUtil.c | 175 ++++++----- src/common/inc/tdataformat.h | 106 ++++++- src/common/src/tdataformat.c | 15 + src/cq/src/cqMain.c | 2 +- src/tsdb/src/tsdbCommit.c | 20 +- src/tsdb/src/tsdbMain.c | 6 +- src/tsdb/tests/tsdbTests.cpp | 4 +- 10 files changed, 774 insertions(+), 168 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 1a9fb4c665..e62661e6f6 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -40,7 +40,8 @@ extern "C" { #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) -#define KvRowNColsThresh 1 // default 1200. TODO: only for test, restore to default value after test finished +#define KvRowNColsThresh 1 // default 1200 +#define KVRowRatio 0.85 // for NonVarType, we get value from SDataRow directly, while needs readdressing for SKVRow #pragma pack(push,1) // this struct is transfered as binary, padding two bytes to avoid @@ -96,11 +97,21 @@ typedef struct SVgroupTableInfo { SArray *itemList; // SArray } SVgroupTableInfo; +typedef struct SBlockKeyTuple { + TSKEY skey; + void* payloadAddr; +} SBlockKeyTuple; + +typedef struct SBlockKeyInfo { + int32_t nBytesAlloc; + SBlockKeyTuple* pKeyTuple; +} SBlockKeyInfo; + int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len); int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); -void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); +int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo); void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo); void doRetrieveSubqueryData(SSchedMsg *pMsg); @@ -343,22 +354,6 @@ char* strdup_throw(const char* str); bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src); SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg); -typedef struct { - // for SDataRow - SSchema* pSchema; - int16_t sversion; - int32_t flen; - // for SKVRow - uint16_t nCols; - uint16_t size; - void* buf; - - void* pDataBlock; - SSubmitBlk* pSubmitBlk; -} SMemRowBuilder; - -SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder); - #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index c1243535b1..428cbf3391 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -95,6 +95,43 @@ typedef struct SParsedDataColInfo { SBoundColumn *cols; } SParsedDataColInfo; +typedef struct { + // for SDataRow + SSchema *pSchema; + int16_t sversion; + int32_t flen; + // for SKVRow + uint16_t nCols; + uint16_t size; + void * buf; + + void * pDataBlock; + SSubmitBlk *pSubmitBlk; + uint16_t allNullLen; +} SMemRowBuilder; + +int FORCE_INLINE initSMemRowBuilder(SMemRowBuilder *pBuilder, SSchema *pSSchema, uint16_t nCols, + uint16_t allNullColsLen) { + ASSERT(nCols > 0); + pBuilder->pSchema = pSSchema; + pBuilder->allNullLen = allNullColsLen; // TODO: get allNullColsLen when creating or altering table meta + if (pBuilder->allNullLen == 0) { + for (uint16_t i = 0; i < nCols; ++i) { + uint8_t type = pSSchema[i].type; + int32_t typeLen = TYPE_BYTES[type]; + ASSERT(typeLen > 0); + pBuilder->allNullLen += typeLen; + if (TSDB_DATA_TYPE_BINARY == type) { + pBuilder->allNullLen += (sizeof(VarDataLenT) + CHAR_BYTES); + } else if (TSDB_DATA_TYPE_NCHAR == type) { + int len = sizeof(VarDataLenT) + TSDB_NCHAR_SIZE; + pBuilder->allNullLen += len; + } + } + } + return 0; +} + typedef struct STableDataBlocks { SName tableName; int8_t tsSource; // where does the UNIX timestamp come from, server or client @@ -109,12 +146,13 @@ typedef struct STableDataBlocks { STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache char *pData; - SParsedDataColInfo boundColumnInfo; + SParsedDataColInfo boundColumnInfo; // for parameter ('?') binding - uint32_t numOfAllocedParams; - uint32_t numOfParams; - SParamInfo *params; + uint32_t numOfAllocedParams; + uint32_t numOfParams; + SParamInfo * params; + SMemRowBuilder rowBuilder; } STableDataBlocks; typedef struct { diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 137a7be7c7..e8da633217 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -38,10 +38,16 @@ enum { TSDB_USE_CLI_TS = 1, }; +static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE; +static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE; + static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t *numOfRows); static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema, char *str, char **end); +static FORCE_INLINE int32_t getPaddingRowSize(STableComInfo *tinfo) { + return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN * tinfo->numOfColumns; +} static int32_t tscToDouble(SStrToken *pToken, double *value, char **endPtr) { errno = 0; *value = strtold(pToken->z, endPtr); @@ -378,6 +384,349 @@ int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, cha return TSDB_CODE_SUCCESS; } +static FORCE_INLINE uint16_t tsSetColumnValue(char *payload, int16_t columnId, uint8_t columnType, void *value, + uint16_t valueLen) { + payloadColSetId(payload, columnId); + payloadColSetType(payload, columnType); + + memcpy(payloadColValue(payload), value, valueLen); + return PAYLOAD_ID_TYPE_LEN + valueLen; +} + +static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str, + bool primaryKey, int16_t timePrec, uint16_t *sizeAppend, bool *isColNull, + TDRowLenT *dataRowColDeltaLen, TDRowLenT *kvRowColLen) { + int64_t iv; + int32_t ret; + char * endptr = NULL; + + if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { + return tscInvalidOperationMsg(msg, "invalid numeric data", pToken->z); + } + + switch (pSchema->type) { + case TSDB_DATA_TYPE_BOOL: { // bool + if (isNullStr(pToken)) { + // *((uint8_t *)payload) = TSDB_DATA_BOOL_NULL; + *isColNull = true; + } else { + if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { + if (strncmp(pToken->z, "true", pToken->n) == 0) { + // *(uint8_t *)payload = TSDB_TRUE; + *sizeAppend = + tsSetColumnValue(payload, pSchema->colId, pSchema->type, &TRUE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]; + } else if (strncmp(pToken->z, "false", pToken->n) == 0) { + // *(uint8_t *)payload = TSDB_FALSE; + *sizeAppend = + tsSetColumnValue(payload, pSchema->colId, pSchema->type, &FALSE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]; + } else { + return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z); + } + } else if (pToken->type == TK_INTEGER) { + iv = strtoll(pToken->z, NULL, 10); + // *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE); + *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, + ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]; + } else if (pToken->type == TK_FLOAT) { + double dv = strtod(pToken->z, NULL); + // *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE); + *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, + ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]; + } else { + return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z); + } + } + break; + } + + case TSDB_DATA_TYPE_TINYINT: + if (isNullStr(pToken)) { + // *((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL; + *isColNull = true; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid tinyint data", pToken->z); + } else if (!IS_VALID_TINYINT(iv)) { + return tscInvalidOperationMsg(msg, "data overflow", pToken->z); + } + + // *((uint8_t *)payload) = (uint8_t)iv; + uint8_t tmpVal = (uint8_t)iv; + *sizeAppend = + tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TINYINT]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TINYINT]; + } + + break; + + case TSDB_DATA_TYPE_UTINYINT: + if (isNullStr(pToken)) { + // *((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL; + *isColNull = true; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned tinyint data", pToken->z); + } else if (!IS_VALID_UTINYINT(iv)) { + return tscInvalidOperationMsg(msg, "unsigned tinyint data overflow", pToken->z); + } + + // *((uint8_t *)payload) = (uint8_t)iv; + uint8_t tmpVal = (uint8_t)iv; + *sizeAppend = + tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT]; + } + + break; + + case TSDB_DATA_TYPE_SMALLINT: + if (isNullStr(pToken)) { + // *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL; + *isColNull = true; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid smallint data", pToken->z); + } else if (!IS_VALID_SMALLINT(iv)) { + return tscInvalidOperationMsg(msg, "smallint data overflow", pToken->z); + } + + int16_t tmpVal = (int16_t)iv; + *sizeAppend = + tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT]; + } + + break; + + case TSDB_DATA_TYPE_USMALLINT: + if (isNullStr(pToken)) { + // *((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL; + *isColNull = true; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned smallint data", pToken->z); + } else if (!IS_VALID_USMALLINT(iv)) { + return tscInvalidOperationMsg(msg, "unsigned smallint data overflow", pToken->z); + } + + // *((uint16_t *)payload) = (uint16_t)iv; + uint16_t tmpVal = (uint16_t)iv; + *sizeAppend = + tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT]; + } + + break; + + case TSDB_DATA_TYPE_INT: + if (isNullStr(pToken)) { + // *((int32_t *)payload) = TSDB_DATA_INT_NULL; + *isColNull = true; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid int data", pToken->z); + } else if (!IS_VALID_INT(iv)) { + return tscInvalidOperationMsg(msg, "int data overflow", pToken->z); + } + + // *((int32_t *)payload) = (int32_t)iv; + int32_t tmpVal = (int32_t)iv; + *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_INT]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_INT]; + } + + break; + + case TSDB_DATA_TYPE_UINT: + if (isNullStr(pToken)) { + // *((uint32_t *)payload) = TSDB_DATA_UINT_NULL; + *isColNull = true; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned int data", pToken->z); + } else if (!IS_VALID_UINT(iv)) { + return tscInvalidOperationMsg(msg, "unsigned int data overflow", pToken->z); + } + + // *((uint32_t *)payload) = (uint32_t)iv; + uint32_t tmpVal = (uint32_t)iv; + *sizeAppend = + tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UINT]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UINT]; + } + + break; + + case TSDB_DATA_TYPE_BIGINT: + if (isNullStr(pToken)) { + // *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; + *isColNull = true; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid bigint data", pToken->z); + } else if (!IS_VALID_BIGINT(iv)) { + return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z); + } + + // *((int64_t *)payload) = iv; + // int64_t tmpVal = (int64_t)iv; + *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &iv, TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]; + } + break; + + case TSDB_DATA_TYPE_UBIGINT: + if (isNullStr(pToken)) { + // *((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL; + *isColNull = true; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned bigint data", pToken->z); + } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { + return tscInvalidOperationMsg(msg, "unsigned bigint data overflow", pToken->z); + } + + // *((uint64_t *)payload) = iv; + *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &iv, TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]; + } + break; + + case TSDB_DATA_TYPE_FLOAT: + if (isNullStr(pToken)) { + // *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL; + *isColNull = true; + } else { + double dv; + if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { + return tscInvalidOperationMsg(msg, "illegal float data", pToken->z); + } + + if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || + isnan(dv)) { + return tscInvalidOperationMsg(msg, "illegal float data", pToken->z); + } + + // *((float *)payload) = (float)dv; + // SET_FLOAT_VAL(payload, dv); + float tmpVal = (float)dv; + *sizeAppend = + tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_FLOAT]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_FLOAT]; + } + break; + + case TSDB_DATA_TYPE_DOUBLE: + if (isNullStr(pToken)) { + // *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL; + *isColNull = true; + } else { + double dv; + if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { + return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); + } + + if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { + return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); + } + + // *((double *)payload) = dv; + *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &dv, TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]; + } + break; + + case TSDB_DATA_TYPE_BINARY: + // binary data cannot be null-terminated char string, otherwise the last char of the string is lost + if (pToken->type == TK_NULL) { + // setVardataNull(payload, TSDB_DATA_TYPE_BINARY); + *isColNull = true; + } else { // too long values will return invalid sql, not be truncated automatically + if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor + return tscInvalidOperationMsg(msg, "string data overflow", pToken->z); + } + + // STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n); + + payloadColSetId(payload, pSchema->colId); + payloadColSetType(payload, pSchema->type); + varDataSetLen(payload + PAYLOAD_ID_TYPE_LEN, pToken->n); + memcpy(varDataVal(payload + PAYLOAD_ID_TYPE_LEN), pToken->z, pToken->n); + *sizeAppend = PAYLOAD_ID_TYPE_LEN + sizeof(VarDataLenT) + pToken->n; + *dataRowColDeltaLen += (pToken->n - sizeof(uint8_t)); + *kvRowColLen += sizeof(SColIdx) + sizeof(VarDataLenT) + pToken->n; + } + + break; + + case TSDB_DATA_TYPE_NCHAR: + if (pToken->type == TK_NULL) { + // setVardataNull(payload, TSDB_DATA_TYPE_NCHAR); + *isColNull = true; + } else { + // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' + int32_t output = 0; + payloadColSetId(payload, pSchema->colId); + payloadColSetType(payload, pSchema->type); + if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload + PAYLOAD_ID_TYPE_LEN), + pSchema->bytes - VARSTR_HEADER_SIZE, &output)) { + char buf[512] = {0}; + snprintf(buf, tListLen(buf), "%s", strerror(errno)); + return tscInvalidOperationMsg(msg, buf, pToken->z); + } + + varDataSetLen(payload + PAYLOAD_ID_TYPE_LEN, output); + + *sizeAppend = PAYLOAD_ID_TYPE_LEN + sizeof(VarDataLenT) + output; + *dataRowColDeltaLen += (output - sizeof(uint32_t)); + *kvRowColLen += sizeof(SColIdx) + sizeof(VarDataLenT) + output; + } + break; + + case TSDB_DATA_TYPE_TIMESTAMP: { + if (pToken->type == TK_NULL) { + if (primaryKey) { + // *((int64_t *)payload) = 0; + // When building SKVRow primaryKey, we should not skip even with NULL value. + int64_t tmpVal = 0; + *sizeAppend = + tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]; + } else { + // *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; + *isColNull = true; + } + } else { + int64_t tmpVal; + if (tsParseTime(pToken, &tmpVal, str, msg, timePrec) != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z); + } + + // *((int64_t *)payload) = tmpVal; + *sizeAppend = + tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]); + *kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]; + } + + break; + } + } + + return TSDB_CODE_SUCCESS; +} + /* * The server time/client time should not be mixed up in one sql string * Do not employ sort operation is not involved if server time is used. @@ -414,23 +763,32 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) { return TSDB_CODE_SUCCESS; } -int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, - char *tmpTokenBuf, SInsertStatementParam* pInsertParam) { - int32_t index = 0; - SStrToken sToken = {0}; - char *payload = pDataBlocks->pData + pDataBlocks->size; +int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, char *tmpTokenBuf, + SInsertStatementParam *pInsertParam) { + int32_t index = 0; + SStrToken sToken = {0}; + + SMemRowBuilder *pBuilder = &pDataBlocks->rowBuilder; + char * payload = pDataBlocks->pData + pDataBlocks->size; SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo; - SSchema *schema = tscGetTableSchema(pDataBlocks->pTableMeta); + SSchema * schema = tscGetTableSchema(pDataBlocks->pTableMeta); // 1. set the parsed value from sql string - int32_t rowSize = 0; + int32_t rowSize = 0; + uint16_t rowSizeAppended = 0; + uint16_t nColsNotNull = 0; + TDRowLenT dataRowLen = pBuilder->allNullLen; + TDRowLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE; + + char *kvStart = payload; for (int i = 0; i < spd->numOfBound; ++i) { // the start position in data block buffer of current value in sql - int32_t colIndex = spd->boundedColumns[i]; + int32_t colIndex = spd->boundedColumns[i]; // ordered + + char *start = payload + spd->cols[colIndex].offset; - char *start = payload + spd->cols[colIndex].offset; - SSchema *pSchema = &schema[colIndex]; + SSchema *pSchema = &schema[colIndex]; // get colId here rowSize += pSchema->bytes; index = 0; @@ -453,7 +811,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i int16_t type = sToken.type; if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && - type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) { + type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || + (sToken.n == 0) || (type == TK_RP)) { return tscSQLSyntaxErrMsg(pInsertParam->msg, "invalid data or symbol", sToken.z); } @@ -467,10 +826,10 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i if (sToken.n >= TSDB_MAX_BYTES_PER_ROW) { return tscSQLSyntaxErrMsg(pInsertParam->msg, "too long string", sToken.z); } - + for (uint32_t k = 1; k < sToken.n - 1; ++k) { if (sToken.z[k] == '\\' || (sToken.z[k] == delim && sToken.z[k + 1] == delim)) { - tmpTokenBuf[j] = sToken.z[k + 1]; + tmpTokenBuf[j] = sToken.z[k + 1]; cnt++; j++; @@ -487,42 +846,44 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i sToken.n -= 2 + cnt; } - bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); - int32_t ret = tsParseOneColumn(pSchema, &sToken, start, pInsertParam->msg, str, isPrimaryKey, timePrec); + bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); + bool isColNull = false; + TDRowLenT dataRowDeltaColLen = 0; // When combine the data as SDataRow, the delta len between all NULL columns. + TDRowLenT kvRowColLen = 0; + uint16_t colSizeAppended = 0; + int32_t ret = + tsParseOneColumnKV(pSchema, &sToken, kvStart + PAYLOAD_HEADER_LEN, pInsertParam->msg, str, isPrimaryKey, + timePrec, &colSizeAppended, &isColNull, &dataRowDeltaColLen, &kvRowColLen); if (ret != TSDB_CODE_SUCCESS) { return ret; } - if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) { + if (isPrimaryKey && + tsCheckTimestamp(pDataBlocks, payloadKeyAddr(kvStart)) != TSDB_CODE_SUCCESS) { tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z); return TSDB_CODE_TSC_INVALID_TIME_STAMP; } - } - - // 2. set the null value for the columns that do not assign values - if (spd->numOfBound < spd->numOfCols) { - char *ptr = payload; - - for (int32_t i = 0; i < spd->numOfCols; ++i) { - if (!spd->cols[i].hasVal) { // current column do not have any value to insert, set it to null - if (schema[i].type == TSDB_DATA_TYPE_BINARY) { - varDataSetLen(ptr, sizeof(int8_t)); - *(uint8_t*) varDataVal(ptr) = TSDB_DATA_BINARY_NULL; - } else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) { - varDataSetLen(ptr, sizeof(int32_t)); - *(uint32_t*) varDataVal(ptr) = TSDB_DATA_NCHAR_NULL; - } else { - setNull(ptr, schema[i].type, schema[i].bytes); - } - } - - ptr += schema[i].bytes; + if (isColNull == false) { + ++nColsNotNull; } + kvRowLen += kvRowColLen; + dataRowLen += dataRowDeltaColLen; + kvStart += colSizeAppended; // move to next column + rowSizeAppended += colSizeAppended; // calculate rowLen + } - rowSize = (int32_t)(ptr - payload); + if (kvRowLen < dataRowLen) { + payloadSetType(payload, SMEM_ROW_KV); + } else { + payloadSetType(payload, SMEM_ROW_DATA); } - *len = rowSize; + *(uint16_t *)(payload + sizeof(uint8_t)) = nColsNotNull; + *len = PAYLOAD_HEADER_LEN + rowSizeAppended; + + payloadSetNCols(payload, nColsNotNull); + payloadSetTLen(payload, *len); + return TSDB_CODE_SUCCESS; } @@ -551,21 +912,26 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn int32_t precision = tinfo.precision; + int32_t rowSizeWithColIdType = getPaddingRowSize(&tinfo); + + initSMemRowBuilder(&pDataBlock->rowBuilder, tscGetTableSchema(pDataBlock->pTableMeta), + tscGetNumOfColumns(pDataBlock->pTableMeta), 0); + while (1) { index = 0; sToken = tStrGetToken(*str, &index, false); if (sToken.n == 0 || sToken.type != TK_LP) break; *str += index; - if ((*numOfRows) >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) { + if ((*numOfRows) >= maxRows || pDataBlock->size + rowSizeWithColIdType >= pDataBlock->nAllocSize) { int32_t tSize; - code = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize); + code = tscAllocateMemIfNeed(pDataBlock, rowSizeWithColIdType, &tSize); if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client strcpy(pInsertParam->msg, "client out of memory"); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - ASSERT(tSize > maxRows); + ASSERT(tSize >= maxRows); maxRows = tSize; } @@ -623,7 +989,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3 // expand the allocated size if (remain < rowSize * factor) { while (remain < rowSize * factor) { - pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5); + pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 2); remain = pDataBlock->nAllocSize - pDataBlock->size; } @@ -657,7 +1023,7 @@ static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, } // data block is disordered, sort it in ascending order -void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) { +void tscSortRemoveDataBlockDupRowsXX(STableDataBlocks *dataBuf) { SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; // size is less than the total size, since duplicated rows may be removed yet. @@ -701,11 +1067,83 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) { dataBuf->prevTS = INT64_MIN; } +// data block is disordered, sort it in ascending order +int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKeyInfo) { + SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; + int16_t nRows = pBlocks->numOfRows; + + // size is less than the total size, since duplicated rows may be removed yet. + + // if use server time, this block must be ordered + if (dataBuf->tsSource == TSDB_USE_SERVER_TS) { + assert(dataBuf->ordered); + } + // allocate memory + size_t curBlkTupleSize = nRows * sizeof(SBlockKeyTuple); + if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->nBytesAlloc < curBlkTupleSize) { + char *tmp = realloc(pBlkKeyInfo->pKeyTuple, curBlkTupleSize); + if (tmp == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp; + pBlkKeyInfo->nBytesAlloc = curBlkTupleSize; + } + memset(pBlkKeyInfo->pKeyTuple, 0, curBlkTupleSize); + + SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; + char * pBlockData = pBlocks->data; + int n = 0; + uint32_t totolPayloadLen = 0; + TDRowLenT payloadTLen = 0; + while (n < nRows) { + pBlkKeyTuple->skey = payloadKey(pBlockData); + pBlkKeyTuple->payloadAddr = pBlockData; + payloadTLen = payloadTLen(pBlockData); + totolPayloadLen += payloadTLen; + // next loop + pBlockData += payloadTLen; + ++pBlkKeyTuple; + ++n; + } + + if (!dataBuf->ordered) { + qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar); + + pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; + int32_t i = 0; + int32_t j = 1; + while (j < nRows) { + TSKEY ti = *(TSKEY *)(pBlkKeyTuple + sizeof(SBlockKeyTuple) * i); + TSKEY tj = *(TSKEY *)(pBlkKeyTuple + sizeof(SBlockKeyTuple) * j); + + if (ti == tj) { + totolPayloadLen -= payloadTLen(pBlkKeyTuple + sizeof(SBlockKeyTuple) * j); + ++j; + continue; + } + + int32_t nextPos = (++i); + if (nextPos != j) { + memmove(pBlkKeyTuple + sizeof(SBlockKeyTuple) * nextPos, pBlkKeyTuple + sizeof(SBlockKeyTuple) * j, sizeof(SBlockKeyTuple)); + } + ++j; + } + + dataBuf->ordered = true; + pBlocks->numOfRows = i + 1; + } + + dataBuf->size = sizeof(SSubmitBlk) + totolPayloadLen; + dataBuf->prevTS = INT64_MIN; + + return 0; +} + static int32_t doParseInsertStatement(SInsertStatementParam *pInsertParam, char **str, STableDataBlocks* dataBuf, int32_t *totalNum) { STableComInfo tinfo = tscGetTableInfo(dataBuf->pTableMeta); int32_t maxNumOfRows; - int32_t code = tscAllocateMemIfNeed(dataBuf, tinfo.rowSize, &maxNumOfRows); + int32_t code = tscAllocateMemIfNeed(dataBuf, getPaddingRowSize(&tinfo), &maxNumOfRows); if (TSDB_CODE_SUCCESS != code) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1489,21 +1927,24 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow } STableDataBlocks *pTableDataBlock = NULL; - int32_t ret = - tscGetDataBlockFromList(pInsertParam->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), - tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL); + int32_t ret = tscGetDataBlockFromList(pInsertParam->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, + sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, + &pTableDataBlock, NULL); if (ret != TSDB_CODE_SUCCESS) { pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } - tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows); + tscAllocateMemIfNeed(pTableDataBlock, getPaddingRowSize(&tinfo), &maxRows); tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW); if (tokenBuf == NULL) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } + initSMemRowBuilder(&pTableDataBlock->rowBuilder, tscGetTableSchema(pTableDataBlock->pTableMeta), + tscGetNumOfColumns(pTableDataBlock->pTableMeta), 0); + while ((readLen = tgetline(&line, &n, fp)) != -1) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { line[--readLen] = 0; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index fb09ceb5cb..8f3455814f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1639,61 +1639,29 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i return TSDB_CODE_SUCCESS; } -static FORCE_INLINE uint8_t checkTdRowType(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen, - uint16_t* nColsNotNull) { - ASSERT(pData != NULL); - if (nCols < KvRowNColsThresh) { - return SMEM_ROW_DATA; - } - int32_t dataRowLength = flen; - int32_t kvRowLength = TD_MEM_ROW_KV_VER_SIZE; - - uint16_t nColsNull = 0; - char* p = (char*)pData; - for (int i = 0; i < nCols; ++i) { - if (IS_VAR_DATA_TYPE(pSchema[i].type)) { - dataRowLength += varDataTLen(p); - if (!isNull(p, pSchema[i].type)) { - kvRowLength += (sizeof(SColIdx) + varDataTLen(p)); - } else { - ++nColsNull; - } - } else { - if (!isNull(p, pSchema[i].type)) { - kvRowLength += (sizeof(SColIdx) + TYPE_BYTES[pSchema[i].type]); - } else { - ++nColsNull; - } - } - // next column - p += pSchema[i].bytes; - } - tscDebug("nColsNull %d, nCols: %d, kvRowLen: %d, dataRowLen: %d", (int32_t)nColsNull, nCols, kvRowLength, - dataRowLength); - if (kvRowLength < dataRowLength) { - if (nColsNotNull) { - *nColsNotNull = nCols - nColsNull; - } - return SMEM_ROW_KV; - } - return SMEM_ROW_DATA; -} -SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { +static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { SSchema* pSchema = pBuilder->pSchema; char* p = (char*)pBuilder->buf; int toffset = 0; - - if(pBuilder->nCols <= 0){ + uint16_t nCols = pBuilder->nCols; + +// RawRow payload structure: +// |<---------- header ------------->|<------- column data array ------->| +// |SMemRowType| dataLen | nCols | colId | colType | value |...|...| +// +-----------+----------+----------+---------------------------------->| +// | uint8_t | uint16_t | uint16_t | int16_t | uint8_t | ??? |...|...| +// +-----------+----------+----------+---------------------------------->| + uint8_t memRowType = payloadType(p); + uint16_t nColsNotNull = payloadNCols(p); + if (pBuilder->nCols <= 0 || nColsNotNull <= 0) { return NULL; } + ASSERT(nColsNotNull <= nCols); - uint16_t nColsNotNull = 0; - uint8_t memRowType = checkTdRowType(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull); - // nColsNotNull = pBuilder->nCols; SMemRow* memRow = (SMemRow)pBuilder->pDataBlock; memRowSetType(memRow, memRowType); @@ -1702,14 +1670,41 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen)); dataRowSetVersion(trow, pBuilder->sversion); - p = (char*)pBuilder->buf; - for (int32_t j = 0; j < pBuilder->nCols; ++j) { - tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset); + p = (char*)payloadBody(pBuilder->buf); + uint16_t i = 0, j = 0; + while (j < pBuilder->nCols) { + if (i >= nColsNotNull) { + break; + } + int16_t colId = *(int16_t*)p; + if (colId == pSchema[j].colId) { + tdAppendColVal(trow, payloadColValue(p), pSchema[j].type, toffset); + toffset += TYPE_BYTES[pSchema[j].type]; + p = skipToNextEles(p); + ++i; + ++j; + } else if (colId < pSchema[j].colId) { + p = skipToNextEles(p); + ++i; + } else { + tdAppendColVal(trow, tdGetNullVal(pSchema[j].type), pSchema[j].type, toffset); + toffset += TYPE_BYTES[pSchema[j].type]; + ++j; + } + } + + while (j < pBuilder->nCols) { + tdAppendColVal(trow, tdGetNullVal(pSchema[j].type), pSchema[j].type, toffset); toffset += TYPE_BYTES[pSchema[j].type]; - p += pSchema[j].bytes; + ++j; + } + while (i < nColsNotNull) { + p = skipToNextEles(p); + ++i; } + pBuilder->buf = p; - } else if (memRowType == SMEM_ROW_KV) { + } else if (memRowType == SMEM_ROW_KV) { ASSERT(nColsNotNull <= pBuilder->nCols); SKVRow kvRow = (SKVRow)memRowKvBody(memRow); uint16_t tlen = TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsNotNull; @@ -1717,14 +1712,17 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { kvRowSetNCols(kvRow, nColsNotNull); memRowKvSetVersion(memRow, pBuilder->sversion); - p = (char*)pBuilder->buf; - for (int32_t j = 0; j < pBuilder->nCols; ++j) { - if(!isNull(p, pSchema[j].type)) { - tdAppendKvColVal(kvRow, p, pSchema[j].colId, pSchema[j].type, toffset); - toffset += sizeof(SColIdx); - } - p += pSchema[j].bytes; + p = (char*)payloadBody(pBuilder->buf); + int i = 0; + while (i < nColsNotNull) { + int16_t colId = payloadColId(p); + uint8_t colType = payloadColType(p); + tdAppendKvColVal(kvRow, payloadColValue(p), colId, colType, toffset); + toffset += sizeof(SColIdx); + p = skipToNextEles(p); + ++i; } + pBuilder->buf = p; } else { @@ -1738,11 +1736,12 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { } // Erase the empty space reserved for binary data -static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema) { +static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema, SBlockKeyTuple *blkKeyTuple) { // TODO: optimize this function, handle the case while binary is not presented - STableMeta* pTableMeta = pTableDataBlock->pTableMeta; - STableComInfo tinfo = tscGetTableInfo(pTableMeta); - SSchema* pSchema = tscGetTableSchema(pTableMeta); + STableMeta* pTableMeta = pTableDataBlock->pTableMeta; + STableComInfo tinfo = tscGetTableInfo(pTableMeta); + SSchema* pSchema = tscGetTableSchema(pTableMeta); + SMemRowBuilder* pBuilder = &pTableDataBlock->rowBuilder; SSubmitBlk* pBlock = pDataBlock; memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk)); @@ -1778,18 +1777,18 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo pBlock->dataLen = 0; int32_t numOfRows = htons(pBlock->numOfRows); - SMemRowBuilder mRowBuilder; - mRowBuilder.pSchema = pSchema; - mRowBuilder.sversion = pTableMeta->sversion; - mRowBuilder.flen = flen; - mRowBuilder.nCols = tinfo.numOfColumns; - mRowBuilder.pDataBlock = pDataBlock; - mRowBuilder.pSubmitBlk = pBlock; - mRowBuilder.buf = p; - mRowBuilder.size = 0; + pBuilder->pSchema = pSchema; + pBuilder->sversion = pTableMeta->sversion; + pBuilder->flen = flen; + pBuilder->nCols = tinfo.numOfColumns; + pBuilder->pDataBlock = pDataBlock; + pBuilder->pSubmitBlk = pBlock; + pBuilder->buf = p; + pBuilder->size = 0; for (int32_t i = 0; i < numOfRows; ++i) { - tdGenMemRowFromBuilder(&mRowBuilder); + pBuilder->buf = (blkKeyTuple+i)->payloadAddr; + tdGenMemRowFromBuilder(pBuilder); } int32_t len = pBlock->dataLen + pBlock->schemaLen; @@ -1807,9 +1806,8 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { if (IS_VAR_DATA_TYPE((pSchema + i)->type)) { result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; } - result += sizeof(SColIdx); } - result += TD_MEM_ROW_TYPE_SIZE; // add len of SMemRow flag + result += TD_MEM_ROW_KV_TYPE_VER_SIZE; // add prefix len of KV type SMemRow(we may use SDataRow or SKVRow) return result; } @@ -1837,14 +1835,17 @@ static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeB } int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap) { - const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); - - void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); + int code = 0; + void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); STableDataBlocks** p = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); STableDataBlocks* pOneTableBlock = *p; + + SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock + while(pOneTableBlock) { SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; if (pBlocks->numOfRows > 0) { @@ -1858,6 +1859,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pInsertParam->objectId, ret); taosHashCleanup(pVnodeDataBlockHashList); tscDestroyBlockArrayList(pVnodeDataBlockList); + tfree(blkKeyInfo.pKeyTuple); return ret; } @@ -1878,16 +1880,26 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl taosHashCleanup(pVnodeDataBlockHashList); tscDestroyBlockArrayList(pVnodeDataBlockList); tfree(dataBuf->pData); + tfree(blkKeyInfo.pKeyTuple); return TSDB_CODE_TSC_OUT_OF_MEMORY; } } - tscSortRemoveDataBlockDupRows(pOneTableBlock); - char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1); + if((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0){ + taosHashCleanup(pVnodeDataBlockHashList); + tscDestroyBlockArrayList(pVnodeDataBlockList); + tfree(dataBuf->pData); + tfree(blkKeyInfo.pKeyTuple); + return code; + } + + ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0); - tscDebug("0x%"PRIx64" name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName), - pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey)); + SBlockKeyTuple* pLastKeyTuple = blkKeyInfo.pKeyTuple + pBlocks->numOfRows - 1; + tscDebug("0x%" PRIx64 " name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, + pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName), pBlocks->tid, pBlocks->numOfRows, + pBlocks->sversion, blkKeyInfo.pKeyTuple->skey, pLastKeyTuple->skey); int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta); @@ -1898,7 +1910,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl pBlocks->schemaLen = 0; // erase the empty space reserved for binary data - int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pInsertParam->schemaAttached); + int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pInsertParam->schemaAttached, blkKeyInfo.pKeyTuple); assert(finalLen <= len); dataBuf->size += (finalLen + sizeof(SSubmitBlk)); @@ -1926,6 +1938,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl // free the table data blocks; pInsertParam->pDataBlocks = pVnodeDataBlockList; taosHashCleanup(pVnodeDataBlockHashList); + tfree(blkKeyInfo.pKeyTuple); return TSDB_CODE_SUCCESS; } diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index b0402d4674..49a0c0f120 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -24,6 +24,33 @@ extern "C" { #endif +#pragma pack(push, 1) +typedef struct { + VarDataLenT len; + uint8_t data; +} SBinaryNullT; + +typedef struct { + VarDataLenT len; + uint32_t data; +} SNCharNullT; +#pragma pack(pop) + +extern const uint8_t BoolNull; +extern const uint8_t TinyintNull; +extern const uint16_t SmallintNull; +extern const uint32_t IntNull; +extern const uint64_t BigintNull; +extern const uint64_t TimestampNull; +extern const uint8_t UTinyintNull; +extern const uint16_t USmallintNull; +extern const uint32_t UIntNull; +extern const uint64_t UBigintNull; +extern const uint32_t FloatNull; +extern const uint64_t DoubleNull; +extern const SBinaryNullT BinaryNull; +extern const SNCharNullT NcharNull; + #define STR_TO_VARSTR(x, str) \ do { \ VarDataLenT __len = (VarDataLenT)strlen(str); \ @@ -207,7 +234,7 @@ SDataRow tdDataRowDup(SDataRow row); SMemRow tdMemRowDup(SMemRow row); // offset here not include dataRow header length -static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) { +static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) { ASSERT(value != NULL); int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row)); @@ -260,6 +287,42 @@ void dataColSetOffset(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle); void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); +static FORCE_INLINE const void *tdGetNullVal(int8_t type) { + switch (type) { + case TSDB_DATA_TYPE_BOOL: + return &BoolNull; + case TSDB_DATA_TYPE_TINYINT: + return &TinyintNull; + case TSDB_DATA_TYPE_SMALLINT: + return &SmallintNull; + case TSDB_DATA_TYPE_INT: + return &IntNull; + case TSDB_DATA_TYPE_BIGINT: + return &BigintNull; + case TSDB_DATA_TYPE_FLOAT: + return &FloatNull; + case TSDB_DATA_TYPE_DOUBLE: + return &DoubleNull; + case TSDB_DATA_TYPE_BINARY: + return &BinaryNull; + case TSDB_DATA_TYPE_TIMESTAMP: + return &TimestampNull; + case TSDB_DATA_TYPE_NCHAR: + return &NcharNull; + case TSDB_DATA_TYPE_UTINYINT: + return &UTinyintNull; + case TSDB_DATA_TYPE_USMALLINT: + return &USmallintNull; + case TSDB_DATA_TYPE_UINT: + return &UIntNull; + case TSDB_DATA_TYPE_UBIGINT: + return &UBigintNull; + default: + ASSERT(0); + return NULL; + } +} + // Get the data pointer from a column-wised data static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { if (IS_VAR_DATA_TYPE(pCol->type)) { @@ -545,6 +608,47 @@ static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t o return NULL; } +// RawRow payload structure: +// |<---------- header ------------->|<---- body: column data tuple ---->| +// |SMemRowType| dataLen | nCols | colId | colType | value |...|...| +// +-----------+----------+----------+---------------------------------->| +// | uint8_t | uint16_t | uint16_t | int16_t | uint8_t | ??? |...|...| +// +-----------+----------+----------+---------------------------------->| + +#define PAYLOAD_NCOLS_LEN sizeof(uint16_t) +#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowLenT)) +#define PAYLOAD_HEADER_LEN (PAYLOAD_NCOLS_OFFSET + PAYLOAD_NCOLS_LEN) +#define PAYLOAD_ID_LEN sizeof(int16_t) +#define PAYLOAD_ID_TYPE_LEN (sizeof(int16_t) + sizeof(uint8_t)) + +#define payloadBody(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN) +#define payloadType(r) (*(uint8_t *)(r)) +#define payloadSetType(r, t) (payloadType(r) = (t)) +#define payloadTLen(r) (*(TDRowLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header +#define payloadSetTLen(r, l) (payloadTLen(r) = (l)) +#define payloadNCols(r) (*(TDRowLenT *)POINTER_SHIFT(r, PAYLOAD_NCOLS_OFFSET)) +#define payloadSetNCols(r, n) (payloadNCols(r) = (n)) + +#define payloadColId(r) (*(int16_t *)(r)) +#define payloadColType(r) (*(uint8_t *)POINTER_SHIFT(r, PAYLOAD_ID_LEN)) +#define payloadColValue(r) POINTER_SHIFT(r, PAYLOAD_ID_TYPE_LEN) + +#define payloadColSetId(r, i) (payloadColId(r) = (i)) +#define payloadColSetType(r, t) (payloadColType(r) = (t)) + +#define payloadKeyAddr(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN) +#define payloadTKey(r) (*(TKEY *)(payloadKeyAddr(r))) +#define payloadKey(r) tdGetKey(payloadTKey(r)) + +static FORCE_INLINE char *skipToNextEles(char *p) { + uint8_t colType = payloadColType(p); + if (IS_VAR_DATA_TYPE(colType)) { + return POINTER_SHIFT(p, PAYLOAD_ID_TYPE_LEN + varDataTLen(payloadColValue(p))); + } else { + return POINTER_SHIFT(p, PAYLOAD_ID_TYPE_LEN + TYPE_BYTES[colType]); + } +} + #ifdef __cplusplus } #endif diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index a46d2e84b0..5392cebe84 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -18,6 +18,21 @@ #include "tcoding.h" #include "wchar.h" +const uint8_t BoolNull = TSDB_DATA_BOOL_NULL; +const uint8_t TinyintNull = TSDB_DATA_TINYINT_NULL; +const uint16_t SmallintNull = TSDB_DATA_SMALLINT_NULL; +const uint32_t IntNull = TSDB_DATA_INT_NULL; +const uint64_t BigintNull = TSDB_DATA_BIGINT_NULL; +const uint64_t TimestampNull = TSDB_DATA_BIGINT_NULL; +const uint8_t UTinyintNull = TSDB_DATA_UTINYINT_NULL; +const uint16_t USmallintNull = TSDB_DATA_USMALLINT_NULL; +const uint32_t UIntNull = TSDB_DATA_UINT_NULL; +const uint64_t UBigintNull = TSDB_DATA_UBIGINT_NULL; +const uint32_t FloatNull = TSDB_DATA_FLOAT_NULL; +const uint64_t DoubleNull = TSDB_DATA_DOUBLE_NULL; +const SBinaryNullT BinaryNull = {1, TSDB_DATA_BINARY_NULL}; +const SNCharNullT NcharNull = {4, TSDB_DATA_NCHAR_NULL}; + static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows); diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index ee1022ea0c..cd762f3110 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -502,7 +502,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { memcpy((char *)val + sizeof(VarDataLenT), buf, len); varDataLen(val) = len; } - tdAppendColVal(dataRow, val, c->type, c->bytes, c->offset); + tdAppendColVal(dataRow, val, c->type, c->offset); } pBlk->dataLen = htonl(memRowDataTLen(trow)); pBlk->schemaLen = 0; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 69d855e57c..8718d5c35a 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -924,17 +924,17 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo continue; } - memset(pBlockCol, 0, sizeof(*pBlockCol)); - - pBlockCol->colId = pDataCol->colId; - pBlockCol->type = pDataCol->type; - if (tDataTypes[pDataCol->type].statisFunc) { - (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), - &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), - &(pBlockCol->numOfNull)); - } - nColsNotAllNull++; + memset(pBlockCol, 0, sizeof(*pBlockCol)); + + pBlockCol->colId = pDataCol->colId; + pBlockCol->type = pDataCol->type; + if (tDataTypes[pDataCol->type].statisFunc) { + (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), + &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), + &(pBlockCol->numOfNull)); } + nColsNotAllNull++; + } ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 272a915d61..fc152231de 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -720,7 +720,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea // OK,let's load row from backward to get not-null column for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; - tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->offset); //SDataCol *pDataCol = readh.pDCols[0]->cols + j; void *value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); if (isNull(value, pCol->type)) { @@ -742,7 +742,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea // save row ts(in column 0) pDataCol = pReadh->pDCols[0]->cols + 0; pCol = schemaColAt(pSchema, 0); - tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->offset); pLastCol->ts = memRowKey(row); pTable->restoreColumnNum += 1; @@ -790,7 +790,7 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, STColumn *pCol = schemaColAt(pSchema, icol); SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol; tdAppendColVal(memRowDataBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, - pCol->bytes, pCol->offset); + pCol->offset); } return 0; diff --git a/src/tsdb/tests/tsdbTests.cpp b/src/tsdb/tests/tsdbTests.cpp index ac254d6c34..dc804856fd 100644 --- a/src/tsdb/tests/tsdbTests.cpp +++ b/src/tsdb/tests/tsdbTests.cpp @@ -55,10 +55,10 @@ static int insertData(SInsertInfo *pInfo) { for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) { STColumn *pTCol = schemaColAt(pInfo->pSchema, j); if (j == 0) { // Just for timestamp - tdAppendColVal(row, (void *)(&start_time), pTCol->type, pTCol->bytes, pTCol->offset); + tdAppendColVal(row, (void *)(&start_time), pTCol->type, pTCol->offset); } else { // For int int val = 10; - tdAppendColVal(row, (void *)(&val), pTCol->type, pTCol->bytes, pTCol->offset); + tdAppendColVal(row, (void *)(&val), pTCol->type, pTCol->offset); } } pBlock->dataLen += dataRowLen(row); -- GitLab