diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index e62661e6f6224c5114901f645f9103a50133039a..049d03a4389b77ef3c8f40b43fd9f1a70d0d82e6 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -103,7 +103,7 @@ typedef struct SBlockKeyTuple { } SBlockKeyTuple; typedef struct SBlockKeyInfo { - int32_t nBytesAlloc; + int32_t maxBytesAlloc; SBlockKeyTuple* pKeyTuple; } SBlockKeyInfo; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 70d4f8979efae2f8c7352be44fd73577f15dee56..dd5980053ccb94d873555777e9534428f7aeea83 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -45,7 +45,7 @@ static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSiz static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema, char *str, char **end); -static FORCE_INLINE int32_t getPaddingRowSize(STableComInfo *tinfo) { +static FORCE_INLINE int32_t getExtendedRowSize(STableComInfo *tinfo) { return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN * tinfo->numOfColumns; } int initSMemRowBuilder(SMemRowBuilder *pBuilder, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen) { @@ -427,17 +427,14 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri switch (pSchema->type) { case TSDB_DATA_TYPE_BOOL: { // bool if (isNullStr(pToken)) { - // *((uint8_t *)payload) = TSDB_DATA_BOOL_NULL; *isColNull = true; } else { if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { if (strncmp(pToken->z, "true", pToken->n) == 0) { - // *(uint8_t *)payload = TSDB_TRUE; *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &TRUE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); } else if (strncmp(pToken->z, "false", pToken->n) == 0) { - // *(uint8_t *)payload = TSDB_FALSE; *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &FALSE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); @@ -446,13 +443,11 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri } } else if (pToken->type == TK_INTEGER) { iv = strtoll(pToken->z, NULL, 10); - // *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE); *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); } else if (pToken->type == TK_FLOAT) { double dv = strtod(pToken->z, NULL); - // *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE); *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); @@ -465,7 +460,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_TINYINT: if (isNullStr(pToken)) { - // *((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL; *isColNull = true; } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); @@ -475,7 +469,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "data overflow", pToken->z); } - // *((uint8_t *)payload) = (uint8_t)iv; uint8_t tmpVal = (uint8_t)iv; *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TINYINT]); @@ -486,7 +479,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_UTINYINT: if (isNullStr(pToken)) { - // *((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL; *isColNull = true; } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); @@ -496,7 +488,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "unsigned tinyint data overflow", pToken->z); } - // *((uint8_t *)payload) = (uint8_t)iv; uint8_t tmpVal = (uint8_t)iv; *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT]); @@ -527,7 +518,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_USMALLINT: if (isNullStr(pToken)) { - // *((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL; *isColNull = true; } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); @@ -537,7 +527,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "unsigned smallint data overflow", pToken->z); } - // *((uint16_t *)payload) = (uint16_t)iv; uint16_t tmpVal = (uint16_t)iv; *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT]); @@ -548,7 +537,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_INT: if (isNullStr(pToken)) { - // *((int32_t *)payload) = TSDB_DATA_INT_NULL; *isColNull = true; } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); @@ -558,7 +546,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "int data overflow", pToken->z); } - // *((int32_t *)payload) = (int32_t)iv; int32_t tmpVal = (int32_t)iv; *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_INT]); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_INT]); @@ -568,7 +555,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_UINT: if (isNullStr(pToken)) { - // *((uint32_t *)payload) = TSDB_DATA_UINT_NULL; *isColNull = true; } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); @@ -578,7 +564,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "unsigned int data overflow", pToken->z); } - // *((uint32_t *)payload) = (uint32_t)iv; uint32_t tmpVal = (uint32_t)iv; *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UINT]); @@ -589,7 +574,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_BIGINT: if (isNullStr(pToken)) { - // *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; *isColNull = true; } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); @@ -599,8 +583,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z); } - // *((int64_t *)payload) = iv; - // int64_t tmpVal = (int64_t)iv; *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &iv, TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]); } @@ -608,7 +590,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_UBIGINT: if (isNullStr(pToken)) { - // *((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL; *isColNull = true; } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); @@ -618,15 +599,15 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "unsigned bigint data overflow", pToken->z); } - // *((uint64_t *)payload) = iv; - *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &iv, TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]); + uint64_t tmpVal = (uint64_t)iv; + *sizeAppend = + tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]); } break; case TSDB_DATA_TYPE_FLOAT: if (isNullStr(pToken)) { - // *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL; *isColNull = true; } else { double dv; @@ -639,8 +620,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "illegal float data", pToken->z); } - // *((float *)payload) = (float)dv; - // SET_FLOAT_VAL(payload, dv); float tmpVal = (float)dv; *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_FLOAT]); @@ -650,7 +629,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_DOUBLE: if (isNullStr(pToken)) { - // *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL; *isColNull = true; } else { double dv; @@ -662,7 +640,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); } - // *((double *)payload) = dv; *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &dv, TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]); } @@ -682,43 +659,41 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri payloadColSetId(payload, pSchema->colId); payloadColSetType(payload, pSchema->type); - varDataSetLen(payload + PAYLOAD_ID_TYPE_LEN, pToken->n); - memcpy(varDataVal(payload + PAYLOAD_ID_TYPE_LEN), pToken->z, pToken->n); - *sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + sizeof(VarDataLenT) + pToken->n); + varDataSetLen(payloadColValue(payload), pToken->n); + memcpy(varDataVal(payloadColValue(payload)), pToken->z, pToken->n); + *sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + VARSTR_HEADER_SIZE + pToken->n); *dataRowColDeltaLen += (TDRowLenT)(pToken->n - sizeof(uint8_t)); - *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + sizeof(VarDataLenT) + pToken->n); + *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + VARSTR_HEADER_SIZE + pToken->n); } break; case TSDB_DATA_TYPE_NCHAR: if (pToken->type == TK_NULL) { - // setVardataNull(payload, TSDB_DATA_TYPE_NCHAR); *isColNull = true; } else { // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' int32_t output = 0; payloadColSetId(payload, pSchema->colId); payloadColSetType(payload, pSchema->type); - if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload + PAYLOAD_ID_TYPE_LEN), + if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payloadColValue(payload)), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) { char buf[512] = {0}; snprintf(buf, tListLen(buf), "%s", strerror(errno)); return tscInvalidOperationMsg(msg, buf, pToken->z); } - varDataSetLen(payload + PAYLOAD_ID_TYPE_LEN, output); + varDataSetLen(payloadColValue(payload), output); - *sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + sizeof(VarDataLenT) + output); + *sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + VARSTR_HEADER_SIZE + output); *dataRowColDeltaLen += (TDRowLenT)(output - sizeof(uint32_t)); - *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + sizeof(VarDataLenT) + output); + *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + VARSTR_HEADER_SIZE + output); } break; case TSDB_DATA_TYPE_TIMESTAMP: { if (pToken->type == TK_NULL) { if (primaryKey) { - // *((int64_t *)payload) = 0; // When building SKVRow primaryKey, we should not skip even with NULL value. int64_t tmpVal = 0; @@ -726,7 +701,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]); } else { - // *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; *isColNull = true; } } else { @@ -735,8 +709,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z); } - // *((int64_t *)payload) = tmpVal; - *sizeAppend = tsSetColumnValue(primaryKey ? primaryKeyStart : payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]); @@ -803,8 +775,10 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i TDRowLenT dataRowLen = pBuilder->allNullLen; TDRowLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE; - char *kvPrimayKeyStart = payload + PAYLOAD_HEADER_LEN; - char *kvStart = kvPrimayKeyStart + PLAYLOAD_PRIMARY_COL_LEN; // make sure 1st column tuple is primaryKey + ASSERT(dataRowLen > 0); + + char *kvPrimayKeyStart = payload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple + char *kvStart = kvPrimayKeyStart + PAYLOAD_PRIMARY_COL_LEN; // the column tuple behind the primaryKey for (int i = 0; i < spd->numOfBound; ++i) { // the start position in data block buffer of current value in sql @@ -882,8 +856,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i return ret; } - if (isPrimaryKey && - tsCheckTimestamp(pDataBlocks, payloadKeyAddr(kvStart)) != TSDB_CODE_SUCCESS) { + if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, payloadColValue(kvPrimayKeyStart)) != TSDB_CODE_SUCCESS) { tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z); return TSDB_CODE_TSC_INVALID_TIME_STAMP; } @@ -904,7 +877,6 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i payloadSetType(payload, SMEM_ROW_DATA); } - *(uint16_t *)(payload + sizeof(uint8_t)) = nColsNotNull; *len = PAYLOAD_HEADER_LEN + rowSizeAppended; payloadSetNCols(payload, nColsNotNull); @@ -938,7 +910,7 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn int32_t precision = tinfo.precision; - int32_t rowSizeWithColIdType = getPaddingRowSize(&tinfo); + int32_t extendedRowSize = getExtendedRowSize(&tinfo); initSMemRowBuilder(&pDataBlock->rowBuilder, tscGetTableSchema(pDataBlock->pTableMeta), tscGetNumOfColumns(pDataBlock->pTableMeta), 0); @@ -949,9 +921,9 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn if (sToken.n == 0 || sToken.type != TK_LP) break; *str += index; - if ((*numOfRows) >= maxRows || pDataBlock->size + rowSizeWithColIdType >= pDataBlock->nAllocSize) { + if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) { int32_t tSize; - code = tscAllocateMemIfNeed(pDataBlock, rowSizeWithColIdType, &tSize); + code = tscAllocateMemIfNeed(pDataBlock, extendedRowSize, &tSize); if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client strcpy(pInsertParam->msg, "client out of memory"); return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -1048,8 +1020,9 @@ static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, } } +#if 0 // data block is disordered, sort it in ascending order -void tscSortRemoveDataBlockDupRowsXX(STableDataBlocks *dataBuf) { +static void tscSortRemoveDataBlockDupRowsOld(STableDataBlocks *dataBuf) { SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; // size is less than the total size, since duplicated rows may be removed yet. @@ -1092,12 +1065,13 @@ void tscSortRemoveDataBlockDupRowsXX(STableDataBlocks *dataBuf) { dataBuf->prevTS = INT64_MIN; } +#endif // data block is disordered, sort it in ascending order int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKeyInfo) { SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; - int16_t nRows = pBlocks->numOfRows; - + int16_t nRows = pBlocks->numOfRows; + // size is less than the total size, since duplicated rows may be removed yet. // if use server time, this block must be ordered @@ -1105,22 +1079,23 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk assert(dataBuf->ordered); } // allocate memory - size_t curBlkTupleSize = nRows * sizeof(SBlockKeyTuple); - if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->nBytesAlloc < curBlkTupleSize) { - char *tmp = realloc(pBlkKeyInfo->pKeyTuple, curBlkTupleSize); + size_t nAlloc = nRows * sizeof(SBlockKeyTuple); + if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) { + size_t nRealAlloc = nAlloc + 10 * sizeof(SBlockKeyTuple); + char * tmp = realloc(pBlkKeyInfo->pKeyTuple, nRealAlloc); if (tmp == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; + return TSDB_CODE_TSC_OUT_OF_MEMORY; } pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp; - pBlkKeyInfo->nBytesAlloc = (int32_t)curBlkTupleSize; + pBlkKeyInfo->maxBytesAlloc = (int32_t)nRealAlloc; } - memset(pBlkKeyInfo->pKeyTuple, 0, curBlkTupleSize); + memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc); SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; - char * pBlockData = pBlocks->data; - int n = 0; - uint32_t totolPayloadLen = 0; - TDRowLenT payloadTLen = 0; + char * pBlockData = pBlocks->data; + uint32_t totolPayloadLen = 0; + TDRowLenT payloadTLen = 0; + int n = 0; while (n < nRows) { pBlkKeyTuple->skey = payloadKey(pBlockData); pBlkKeyTuple->payloadAddr = pBlockData; @@ -1170,7 +1145,7 @@ static int32_t doParseInsertStatement(SInsertStatementParam *pInsertParam, char STableComInfo tinfo = tscGetTableInfo(dataBuf->pTableMeta); int32_t maxNumOfRows; - int32_t code = tscAllocateMemIfNeed(dataBuf, getPaddingRowSize(&tinfo), &maxNumOfRows); + int32_t code = tscAllocateMemIfNeed(dataBuf, getExtendedRowSize(&tinfo), &maxNumOfRows); if (TSDB_CODE_SUCCESS != code) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1962,7 +1937,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow goto _error; } - tscAllocateMemIfNeed(pTableDataBlock, getPaddingRowSize(&tinfo), &maxRows); + tscAllocateMemIfNeed(pTableDataBlock, getExtendedRowSize(&tinfo), &maxRows); tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW); if (tokenBuf == NULL) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 9870262b65480490f99b3aa8ed71ae9bb0651766..8a3e8cad32b50bd99a5668b40caea8e9faadd758 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1796,7 +1796,8 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo } static int32_t getRowExpandSize(STableMeta* pTableMeta) { - int32_t result = TD_DATA_ROW_HEAD_SIZE; + // add prefix len of KV type SMemRow(we may use SDataRow or SKVRow) + int32_t result = TD_DATA_ROW_HEAD_SIZE + TD_MEM_ROW_KV_TYPE_VER_SIZE; int32_t columns = tscGetNumOfColumns(pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMeta); for(int32_t i = 0; i < columns; i++) { @@ -1804,7 +1805,6 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; } } - result += TD_MEM_ROW_KV_TYPE_VER_SIZE; // add prefix len of KV type SMemRow(we may use SDataRow or SKVRow) return result; } @@ -1835,7 +1835,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); int code = 0; void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); + SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); STableDataBlocks** p = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); @@ -1883,11 +1883,11 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl } } - if((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0){ - taosHashCleanup(pVnodeDataBlockHashList); - tscDestroyBlockArrayList(pVnodeDataBlockList); - tfree(dataBuf->pData); - tfree(blkKeyInfo.pKeyTuple); + if ((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) { + taosHashCleanup(pVnodeDataBlockHashList); + tscDestroyBlockArrayList(pVnodeDataBlockList); + tfree(dataBuf->pData); + tfree(blkKeyInfo.pKeyTuple); return code; } diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index dd35e94cfcf71240d8fe07023b822ebbe6406f03..093b7c51880364b66d7ffef3ec4ed1a0cc627da0 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -620,7 +620,7 @@ static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t o #define PAYLOAD_HEADER_LEN (PAYLOAD_NCOLS_OFFSET + PAYLOAD_NCOLS_LEN) #define PAYLOAD_ID_LEN sizeof(int16_t) #define PAYLOAD_ID_TYPE_LEN (sizeof(int16_t) + sizeof(uint8_t)) -#define PLAYLOAD_PRIMARY_COL_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(TSKEY)) +#define PAYLOAD_PRIMARY_COL_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(TSKEY)) #define payloadBody(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN) #define payloadType(r) (*(uint8_t *)(r))