diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 7f35cf0ea5080cbb49db3a78b7d53df58cb9724c..c664c4991d9aeb8166b9d030cf6b3db58ce52ebd 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -531,14 +531,15 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) { return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen; } -static FORCE_INLINE void checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) { +static FORCE_INLINE bool checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) { if (isDataRow(row)) { if (kvLen < (dataLen * KVRatioConvert)) { - memRowSetConvert(row); + return true; } } else if (kvLen > dataLen) { - memRowSetConvert(row); + return true; } + return false; } static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) { diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 94f9a3018aae175f0f27c1c24b735f5a0392102d..76f05ad99a97b4752b6f982711af2f8b6ccda61b 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -444,12 +444,14 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) { return TSDB_CODE_SUCCESS; } -int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, char *tmpTokenBuf, - SInsertStatementParam *pInsertParam) { +int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, bool *isConverted, int32_t rowSize, + char *tmpTokenBuf, SInsertStatementParam *pInsertParam) { int32_t index = 0; SStrToken sToken = {0}; - char *row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header + char *row = pDataBlocks->pData + + ((*isConverted) ? (pDataBlocks->size - rowSize) : pDataBlocks->size); // skip the SSubmitBlk header + *isConverted = false; SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo; STableMeta * pTableMeta = pDataBlocks->pTableMeta; @@ -550,12 +552,13 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i if (!isParseBindParam) { // 2. check and set convert flag + bool isNeedConvertRow = false; if (pBuilder->compareStat == ROW_COMPARE_NEED) { - checkAndConvertMemRow(row, dataLen, kvLen); + isNeedConvertRow = checkAndConvertMemRow(row, dataLen, kvLen); } // 3. set the null value for the columns that do not assign values - if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow(row)) { + if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow) { SDataRow dataRow = memRowDataBody(row); for (int32_t i = 0; i < spd->numOfCols; ++i) { if (spd->cols[i].valStat == VAL_STAT_NONE) { @@ -563,9 +566,14 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i } } } - } - *len = getExtendedRowSize(pDataBlocks); + // 4. perform the convert + if (isNeedConvertRow) { + // put converted row to next location to minimize the memcpy + convertSMemRow(row + rowSize, row, pDataBlocks); + *isConverted = true; + } + } return TSDB_CODE_SUCCESS; } @@ -623,13 +631,15 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn pDataBlock->boundColumnInfo.allNullLen))) { return code; } + bool isConverted = false; while (1) { index = 0; sToken = tStrGetToken(*str, &index, false); if (sToken.n == 0 || sToken.type != TK_LP) break; *str += index; - if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) { + // allocate 1 more row size to facilitate the SDataRow/SKVRow convert + if ((*numOfRows + 1) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) { int32_t tSize; code = tscAllocateMemIfNeed(pDataBlock, extendedRowSize, &tSize); if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client @@ -637,17 +647,16 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn return TSDB_CODE_TSC_OUT_OF_MEMORY; } - ASSERT(tSize >= maxRows); + ASSERT(tSize > maxRows); // 1 more row allocated maxRows = tSize; } - int32_t len = 0; - code = tsParseOneRow(str, pDataBlock, precision, &len, tmpTokenBuf, pInsertParam); + code = tsParseOneRow(str, pDataBlock, precision, &isConverted, extendedRowSize, tmpTokenBuf, pInsertParam); if (code != TSDB_CODE_SUCCESS) { // error message has been set in tsParseOneRow, return directly return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } - pDataBlock->size += len; + pDataBlock->size += extendedRowSize; index = 0; sToken = tStrGetToken(*str, &index, false); @@ -659,6 +668,10 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn (*numOfRows)++; } + if (isConverted) { + void *convertedSMemRow = pDataBlock->pData + pDataBlock->size; + memcpy(convertedSMemRow - extendedRowSize, convertedSMemRow, memRowTLen(convertedSMemRow)); + } if ((*numOfRows) <= 0) { strcpy(pInsertParam->msg, "no any data points"); @@ -1754,13 +1767,19 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow goto _error; } - tscAllocateMemIfNeed(pTableDataBlock, getExtendedRowSize(pTableDataBlock), &maxRows); + int32_t extendedRowSize = getExtendedRowSize(pTableDataBlock); + + tscAllocateMemIfNeed(pTableDataBlock, extendedRowSize, &maxRows); tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW); if (tokenBuf == NULL) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } + --maxRows; // 1 more row needed to facilitate the SDataRow/SKVRow convert + ASSERT(maxRows > 0); + + bool isConverted = false; while ((readLen = tgetline(&line, &n, fp)) != -1) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { line[--readLen] = 0; @@ -1773,19 +1792,23 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow char *lineptr = line; strtolower(line, line); - int32_t len = 0; - code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &len, tokenBuf, pInsertParam); + code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &isConverted, extendedRowSize, tokenBuf, + pInsertParam); if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) { pSql->res.code = code; break; } - pTableDataBlock->size += len; + pTableDataBlock->size += extendedRowSize; if (++count >= maxRows) { break; } } + if (isConverted) { + void *convertedSMemRow = pTableDataBlock->pData + pTableDataBlock->size; + memcpy(convertedSMemRow - extendedRowSize, convertedSMemRow, memRowTLen(convertedSMemRow)); + } tfree(tokenBuf); tfree(line); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index e6c4e12e34cee28b0402fe3104d298a4080e6e1c..a36562371899d718c00edaa230eebbba62f1723a 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2018,18 +2018,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI } } else { for (int32_t i = 0; i < numOfRows; ++i) { - char* payload = (blkKeyTuple + i)->payloadAddr; - if (isNeedConvertRow(payload)) { - convertSMemRow(pDataBlock, payload, pTableDataBlock); - TDRowTLenT rowTLen = memRowTLen(pDataBlock); - pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); - pBlock->dataLen += rowTLen; - } else { - TDRowTLenT rowTLen = memRowTLen(payload); - memcpy(pDataBlock, payload, rowTLen); - pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); - pBlock->dataLen += rowTLen; - } + char* payload = (blkKeyTuple + i)->payloadAddr; + TDRowTLenT rowTLen = memRowTLen(payload); + memcpy(pDataBlock, payload, rowTLen); + pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); + pBlock->dataLen += rowTLen; } }