From 1800c2e846e290ede088dc313a64d2dd95f99636 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 19 Jul 2021 16:50:25 +0800 Subject: [PATCH] support payload type for both prepare and non-prepare mode --- src/client/inc/tscUtil.h | 1 + src/client/inc/tsclient.h | 9 ++ src/client/src/tscParseInsert.c | 4 +- src/client/src/tscPrepare.c | 225 ++------------------------------ src/client/src/tscUtil.c | 86 ++++++++---- 5 files changed, 79 insertions(+), 246 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index abc3d47e0b..9557d3afda 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -108,6 +108,7 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); +void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf); int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo); void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 37a6b4b051..eeb7cc68aa 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -159,6 +159,7 @@ typedef struct SInsertStatementParam { SHashObj *pTableBlockHashList; // data block for each table SArray *pDataBlocks; // SArray. Merged submit block for each vgroup int8_t schemaAttached; // denote if submit block is built with table schema or not + uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert STagData tagData; // NOTE: pTagData->data is used as a variant length array int32_t batchSize; // for parameter ('?') binding and batch processing @@ -170,6 +171,14 @@ typedef struct SInsertStatementParam { char *sql; // current sql statement position } SInsertStatementParam; +typedef enum { + PAYLOAD_TYPE_KV = 0, + PAYLOAD_TYPE_RAW = 1, +} EPayloadType; + +#define IS_RAW_PAYLOAD(t) \ + (((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert + // TODO extract sql parser supporter typedef struct { int command; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index c3bf3605b2..adc24dfbab 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1069,9 +1069,8 @@ static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, } } -#if 0 // data block is disordered, sort it in ascending order -static void tscSortRemoveDataBlockDupRowsOld(STableDataBlocks *dataBuf) { +void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) { SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; // size is less than the total size, since duplicated rows may be removed yet. @@ -1114,7 +1113,6 @@ static void tscSortRemoveDataBlockDupRowsOld(STableDataBlocks *dataBuf) { dataBuf->prevTS = INT64_MIN; } -#endif // data block is disordered, sort it in ascending order int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKeyInfo) { diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 36c10f6cd5..d39f9ebe24 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -291,7 +291,6 @@ static char* normalStmtBuildSql(STscStmt* stmt) { return taosStringBuilderGetResult(&sb, NULL); } -#if 0 static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) { SParsedDataColInfo* spd = &pBlock->boundColumnInfo; int32_t offset = 0; @@ -319,129 +318,8 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) { return TSDB_CODE_SUCCESS; } -#endif - -/** - * input: - * - schema: - * - payload: - * - spd: - * output: - * - pBlock with data block replaced by K-V format - */ -static int refactorPayload(STableDataBlocks* pBlock, int32_t rowNum) { - SParsedDataColInfo* spd = &pBlock->boundColumnInfo; - SSchema* schema = (SSchema*)pBlock->pTableMeta->schema; - SMemRowHelper* pHelper = &pBlock->rowHelper; - STableMeta* pTableMeta = pBlock->pTableMeta; - STableComInfo tinfo = tscGetTableInfo(pTableMeta); - int code = TSDB_CODE_SUCCESS; - int32_t extendedRowSize = getExtendedRowSize(&tinfo); - TDRowTLenT destPayloadSize = sizeof(SSubmitBlk); - - ASSERT(pHelper->allNullLen >= 8); - - TDRowTLenT destAllocSize = sizeof(SSubmitBlk) + rowNum * extendedRowSize; - SSubmitBlk* pDestBlock = tcalloc(destAllocSize, 1); - if (pDestBlock == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - memcpy(pDestBlock, pBlock->pData, sizeof(SSubmitBlk)); - char* destPayload = (char*)pDestBlock + sizeof(SSubmitBlk); - - char* srcPayload = (char*)pBlock->pData + sizeof(SSubmitBlk); - - for (int n = 0; n < rowNum; ++n) { - payloadSetNCols(destPayload, spd->numOfBound); - - TDRowTLenT dataRowLen = pHelper->allNullLen; - TDRowTLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE + sizeof(SColIdx) * spd->numOfBound; - TDRowTLenT payloadValOffset = payloadValuesOffset(destPayload); // rely on payloadNCols - TDRowLenT colValOffset = 0; - - char* kvPrimaryKeyStart = destPayload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple - char* kvStart = kvPrimaryKeyStart + PAYLOAD_COL_HEAD_LEN; // the column tuple behind the primaryKey - - for (int32_t i = 0; i < spd->numOfBound; ++i) { - int32_t colIndex = spd->boundedColumns[i]; - ASSERT(spd->cols[colIndex].hasVal); - char* start = srcPayload + spd->cols[colIndex].offset; - SSchema* pSchema = &schema[colIndex]; // get colId here - bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); - - // the primary key locates in 1st column - if (!IS_DATA_COL_ORDERED(spd->orderStatus)) { - ASSERT(spd->colIdxInfo != NULL); - if (!isPrimaryKey) { - kvStart = POINTER_SHIFT(kvPrimaryKeyStart, spd->colIdxInfo[i].finalIdx * PAYLOAD_COL_HEAD_LEN); - } else { - ASSERT(spd->colIdxInfo[i].finalIdx == 0); - } - } - if (isPrimaryKey) { - payloadColSetId(kvPrimaryKeyStart, pSchema->colId); - payloadColSetType(kvPrimaryKeyStart, pSchema->type); - payloadColSetOffset(kvPrimaryKeyStart, colValOffset); - memcpy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start, TYPE_BYTES[pSchema->type]); - colValOffset += TYPE_BYTES[pSchema->type]; - kvRowLen += TYPE_BYTES[pSchema->type]; - } else { - payloadColSetId(kvStart, pSchema->colId); - payloadColSetType(kvStart, pSchema->type); - payloadColSetOffset(kvStart, colValOffset); - if (IS_VAR_DATA_TYPE(pSchema->type)) { - varDataCopy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start); - colValOffset += varDataTLen(start); - kvRowLen += varDataTLen(start); - if (pSchema->type == TSDB_DATA_TYPE_BINARY) { - dataRowLen += (varDataLen(start) - CHAR_BYTES); - } else if (pSchema->type == TSDB_DATA_TYPE_NCHAR) { - dataRowLen += (varDataLen(start) - TSDB_NCHAR_SIZE); - } else { - ASSERT(0); - } - } else { - memcpy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start, TYPE_BYTES[pSchema->type]); - colValOffset += TYPE_BYTES[pSchema->type]; - kvRowLen += TYPE_BYTES[pSchema->type]; - } - - if (IS_DATA_COL_ORDERED(spd->orderStatus)) { - kvStart += PAYLOAD_COL_HEAD_LEN; // move to next column - } - } - - - } // end of column - - if (kvRowLen < dataRowLen) { - payloadSetType(destPayload, SMEM_ROW_KV); - } else { - payloadSetType(destPayload, SMEM_ROW_DATA); - } - ASSERT(colValOffset <= TSDB_MAX_BYTES_PER_ROW); - TDRowTLenT len = payloadValOffset + colValOffset; - payloadSetTLen(destPayload, len); - - // next loop - srcPayload += pBlock->rowSize; - destPayload += len; - - destPayloadSize += len; - } // end of row - - ASSERT(destPayloadSize <= destAllocSize); - - tfree(pBlock->pData); - pBlock->pData = (char*)pDestBlock; - pBlock->nAllocSize = destAllocSize; - pBlock->size = destPayloadSize; - - return code; -} -#if 0 int32_t fillTablesColumnsNull(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; @@ -464,98 +342,17 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) { return TSDB_CODE_SUCCESS; } -#endif - -/** - * check and sort - */ -static int initPayloadEnv(STableDataBlocks* pBlock, int32_t rowNum) { - SParsedDataColInfo* spd = &pBlock->boundColumnInfo; - if (spd->orderStatus != ORDER_STATUS_UNKNOWN) { - return TSDB_CODE_SUCCESS; - } - - bool isOrdered = true; - int32_t lastColIdx = -1; - for (int32_t i = 0; i < spd->numOfBound; ++i) { - ASSERT(spd->cols[i].hasVal); - int32_t colIdx = spd->boundedColumns[i]; - if (isOrdered) { - if (lastColIdx > colIdx) { - isOrdered = false; - break; - } else { - lastColIdx = colIdx; - } - } - } - - spd->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED; - - if (isOrdered) { - spd->colIdxInfo = NULL; - } else { - spd->colIdxInfo = calloc(spd->numOfBound, sizeof(SBoundIdxInfo)); - if (spd->colIdxInfo == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - SBoundIdxInfo* pColIdx = spd->colIdxInfo; - for (uint16_t i = 0; i < spd->numOfBound; ++i) { - pColIdx[i].schemaColIdx = (uint16_t)spd->boundedColumns[i]; - pColIdx[i].boundIdx = i; - } - qsort(pColIdx, spd->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar); - for (uint16_t i = 0; i < spd->numOfBound; ++i) { - pColIdx[i].finalIdx = i; - } - qsort(pColIdx, spd->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar); - } - - return TSDB_CODE_SUCCESS; -} - -/** - * Refactor the raw payload structure to K-V format as the in tsParseOneRow() - */ -int32_t fillTablesPayload(SSqlObj* pSql) { - SSqlCmd* pCmd = &pSql->cmd; - int code = TSDB_CODE_SUCCESS; - STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL); - STableDataBlocks* pOneTableBlock = *p; - while (pOneTableBlock) { - SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData; - if (pBlocks->numOfRows > 0) { - initSMemRowHelper(&pOneTableBlock->rowHelper, tscGetTableSchema(pOneTableBlock->pTableMeta), - tscGetNumOfColumns(pOneTableBlock->pTableMeta), 0); - if ((code = initPayloadEnv(pOneTableBlock, pBlocks->numOfRows)) != TSDB_CODE_SUCCESS) { - return code; - } - if ((code = refactorPayload(pOneTableBlock, pBlocks->numOfRows)) != TSDB_CODE_SUCCESS) { - return code; - }; - } - - p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p); - if (p == NULL) { - break; - } - - pOneTableBlock = *p; +//////////////////////////////////////////////////////////////////////////////// +// functions for insertion statement preparation +static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) { + if (bind->is_null != NULL && *(bind->is_null)) { + setNull(data + param->offset, param->type, param->bytes); + return TSDB_CODE_SUCCESS; } - return code; -} - //////////////////////////////////////////////////////////////////////////////// - // functions for insertion statement preparation - static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) { - if (bind->is_null != NULL && *(bind->is_null)) { - setNull(data + param->offset, param->type, param->bytes); - return TSDB_CODE_SUCCESS; - } - #if 0 if (0) { // allow user bind param data with different type @@ -1309,12 +1106,9 @@ static int insertStmtExecute(STscStmt* stmt) { pBlk->uid = pTableMeta->id.uid; pBlk->tid = pTableMeta->id.tid; - int code = fillTablesPayload(stmt->pSql); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + fillTablesColumnsNull(stmt->pSql); - code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false); + int code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1391,7 +1185,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) { return TSDB_CODE_TSC_APP_ERROR; } - fillTablesPayload(pStmt->pSql); + fillTablesColumnsNull(pStmt->pSql); if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) { return code; @@ -2012,6 +1806,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) { pStmt->last = STMT_EXECUTE; + pStmt->pSql->cmd.insertParam.payloadType = PAYLOAD_TYPE_RAW; if (pStmt->multiTbInsert) { ret = insertBatchStmtExecute(pStmt); } else { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 2c82f767bd..26e3a8915d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1860,7 +1860,8 @@ static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { } // Erase the empty space reserved for binary data -static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema, SBlockKeyTuple *blkKeyTuple) { +static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SInsertStatementParam* insertParam, + SBlockKeyTuple* blkKeyTuple) { // TODO: optimize this function, handle the case while binary is not presented STableMeta* pTableMeta = pTableDataBlock->pTableMeta; STableComInfo tinfo = tscGetTableInfo(pTableMeta); @@ -1873,7 +1874,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo int32_t flen = 0; // original total length of row // schema needs to be included into the submit data block - if (includeSchema) { + if (insertParam->schemaAttached) { int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta); for(int32_t j = 0; j < numOfCols; ++j) { STColumn* pCol = (STColumn*) pDataBlock; @@ -1900,18 +1901,38 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo pBlock->dataLen = 0; int32_t numOfRows = htons(pBlock->numOfRows); - SMemRowBuilder rowBuilder; - rowBuilder.pSchema = pSchema; - rowBuilder.sversion = pTableMeta->sversion; - rowBuilder.flen = flen; - rowBuilder.nCols = tinfo.numOfColumns; - rowBuilder.pDataBlock = pDataBlock; - rowBuilder.pSubmitBlk = pBlock; - rowBuilder.buf = p; + if (IS_RAW_PAYLOAD(insertParam->payloadType)) { + for (int32_t i = 0; i < numOfRows; ++i) { + SMemRow memRow = (SMemRow)pDataBlock; + memRowSetType(memRow, SMEM_ROW_DATA); + SDataRow trow = memRowDataBody(memRow); + dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen)); + dataRowSetVersion(trow, pTableMeta->sversion); + + int toffset = 0; + for (int32_t j = 0; j < tinfo.numOfColumns; j++) { + tdAppendColVal(trow, p, pSchema[j].type, toffset); + toffset += TYPE_BYTES[pSchema[j].type]; + p += pSchema[j].bytes; + } + + pDataBlock = (char*)pDataBlock + memRowTLen(memRow); + pBlock->dataLen += memRowTLen(memRow); + } + } else { + SMemRowBuilder rowBuilder; + rowBuilder.pSchema = pSchema; + rowBuilder.sversion = pTableMeta->sversion; + rowBuilder.flen = flen; + rowBuilder.nCols = tinfo.numOfColumns; + rowBuilder.pDataBlock = pDataBlock; + rowBuilder.pSubmitBlk = pBlock; + rowBuilder.buf = p; - for (int32_t i = 0; i < numOfRows; ++i) { - rowBuilder.buf = (blkKeyTuple + i)->payloadAddr; - tdGenMemRowFromBuilder(&rowBuilder); + for (int32_t i = 0; i < numOfRows; ++i) { + rowBuilder.buf = (blkKeyTuple + i)->payloadAddr; + tdGenMemRowFromBuilder(&rowBuilder); + } } int32_t len = pBlock->dataLen + pBlock->schemaLen; @@ -1959,6 +1980,7 @@ static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeB int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap) { const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); int code = 0; + bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType); void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); @@ -1967,7 +1989,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl STableDataBlocks* pOneTableBlock = *p; SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock - + while(pOneTableBlock) { SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; if (pBlocks->numOfRows > 0) { @@ -2008,21 +2030,29 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl } } - if ((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) { - taosHashCleanup(pVnodeDataBlockHashList); - tscDestroyBlockArrayList(pVnodeDataBlockList); - tfree(dataBuf->pData); - tfree(blkKeyInfo.pKeyTuple); - return code; - } - - ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0); + if (isRawPayload) { + tscSortRemoveDataBlockDupRowsRaw(pOneTableBlock); + char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize * (pBlocks->numOfRows - 1); - SBlockKeyTuple* pLastKeyTuple = blkKeyInfo.pKeyTuple + pBlocks->numOfRows - 1; - tscDebug("0x%" PRIx64 " name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, - pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName), pBlocks->tid, pBlocks->numOfRows, - pBlocks->sversion, blkKeyInfo.pKeyTuple->skey, pLastKeyTuple->skey); + tscDebug("0x%" PRIx64 " name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, + pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName), pBlocks->tid, + pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey)); + } else { + if ((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) { + taosHashCleanup(pVnodeDataBlockHashList); + tscDestroyBlockArrayList(pVnodeDataBlockList); + tfree(dataBuf->pData); + tfree(blkKeyInfo.pKeyTuple); + return code; + } + ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0); + SBlockKeyTuple* pLastKeyTuple = blkKeyInfo.pKeyTuple + pBlocks->numOfRows - 1; + tscDebug("0x%" PRIx64 " name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, + pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName), pBlocks->tid, + pBlocks->numOfRows, pBlocks->sversion, blkKeyInfo.pKeyTuple->skey, pLastKeyTuple->skey); + } + int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta); pBlocks->tid = htonl(pBlocks->tid); @@ -2032,7 +2062,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl pBlocks->schemaLen = 0; // erase the empty space reserved for binary data - int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pInsertParam->schemaAttached, blkKeyInfo.pKeyTuple); + int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pInsertParam, blkKeyInfo.pKeyTuple); assert(finalLen <= len); dataBuf->size += (finalLen + sizeof(SSubmitBlk)); -- GitLab