/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "os.h" #include "parInsertUtil.h" #include "parInt.h" #include "parToken.h" #include "query.h" #include "tglobal.h" #include "ttime.h" #include "ttypes.h" typedef struct SKvParam { int16_t pos; SArray* pTagVals; SSchema* schema; char buf[TSDB_MAX_TAGS_LEN]; } SKvParam; int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { int32_t code = TSDB_CODE_SUCCESS; SArray* pVgDataBlocks = NULL; // merge according to vgId if (taosHashGetSize(pBlockHash) > 0) { code = insMergeTableDataBlocks(pBlockHash, &pVgDataBlocks); } if (TSDB_CODE_SUCCESS == code) { code = insBuildOutput(pVgHash, pVgDataBlocks, &((SVnodeModifOpStmt*)pQuery->pRoot)->pDataBlocks); } insDestroyBlockArrayList(pVgDataBlocks); return code; } int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; int32_t code = TSDB_CODE_SUCCESS; SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; if (NULL == tags) { return TSDB_CODE_QRY_APP_ERROR; } SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal)); if (!pTagArray) { return buildInvalidOperationMsg(&pBuf, "out of memory"); } SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN); if (!tagName) { code = buildInvalidOperationMsg(&pBuf, "out of memory"); goto end; } SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); bool isJson = false; STag* pTag = NULL; for (int c = 0; c < tags->numOfBound; ++c) { if (bind[c].is_null && bind[c].is_null[0]) { continue; } SSchema* pTagSchema = &pSchema[tags->boundColumns[c]]; int32_t colLen = pTagSchema->bytes; if (IS_VAR_DATA_TYPE(pTagSchema->type)) { colLen = bind[c].length[0]; if ((colLen + VARSTR_HEADER_SIZE) > pTagSchema->bytes) { code = buildInvalidOperationMsg(&pBuf, "tag length is too big"); goto end; } } taosArrayPush(tagName, pTagSchema->name); if (pTagSchema->type == TSDB_DATA_TYPE_JSON) { if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer); goto end; } isJson = true; char* tmp = taosMemoryCalloc(1, colLen + 1); memcpy(tmp, bind[c].buffer, colLen); code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf); taosMemoryFree(tmp); if (code != TSDB_CODE_SUCCESS) { goto end; } } else { STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; // strcpy(val.colName, pTagSchema->name); if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) { val.pData = (uint8_t*)bind[c].buffer; val.nData = colLen; } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) { int32_t output = 0; void* p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE); if (p == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; } if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) { if (errno == E2BIG) { taosMemoryFree(p); code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name); goto end; } char buf[512] = {0}; snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno)); taosMemoryFree(p); code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer); goto end; } val.pData = p; val.nData = output; } else { memcpy(&val.i64, bind[c].buffer, colLen); } taosArrayPush(pTagArray, &val); } } if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) { goto end; } SVCreateTbReq tbReq = {0}; insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL); code = insBuildCreateTbMsg(pDataBlock, &tbReq); tdDestroySVCreateTbReq(&tbReq); end: for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { STagVal* p = (STagVal*)taosArrayGet(pTagArray, i); if (p->type == TSDB_DATA_TYPE_NCHAR) { taosMemoryFreeClear(p->pData); } } taosArrayDestroy(pTagArray); taosArrayDestroy(tagName); return code; } int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock); SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; SRowBuilder* pBuilder = &pDataBlock->rowBuilder; SMemParam param = {.rb = pBuilder}; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; int32_t rowNum = bind->num; CHECK_CODE( insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num)); for (int32_t r = 0; r < bind->num; ++r) { STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header tdSRowResetBuf(pBuilder, row); for (int c = 0; c < spd->numOfBound; ++c) { SSchema* pColSchema = &pSchema[spd->boundColumns[c]]; if (bind[c].num != rowNum) { return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); } param.schema = pColSchema; insGetSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); if (bind[c].is_null && bind[c].is_null[r]) { if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL"); } CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, ¶m)); } else { if (bind[c].buffer_type != pColSchema->type) { return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); } int32_t colLen = pColSchema->bytes; if (IS_VAR_DATA_TYPE(pColSchema->type)) { colLen = bind[c].length[r]; } CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, ¶m)); } if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { TSKEY tsKey = TD_ROW_KEY(row); insCheckTimestamp(pDataBlock, (const char*)&tsKey); } } // set the null value for the columns that do not assign values if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { pBuilder->hasNone = true; } tdSRowEnd(pBuilder); #ifdef TD_DEBUG_PRINT_ROW STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1); tdSRowPrint(row, pSTSchema, __func__); taosMemoryFree(pSTSchema); #endif pDataBlock->size += extendedRowSize; } SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); return insSetBlockInfo(pBlocks, pDataBlock, bind->num, &pBuf); } int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum) { STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock); SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; SRowBuilder* pBuilder = &pDataBlock->rowBuilder; SMemParam param = {.rb = pBuilder}; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; bool rowStart = (0 == colIdx); bool rowEnd = ((colIdx + 1) == spd->numOfBound); if (rowStart) { CHECK_CODE( insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num)); } for (int32_t r = 0; r < bind->num; ++r) { STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r); // skip the SSubmitBlk header if (rowStart) { tdSRowResetBuf(pBuilder, row); } else { tdSRowGetBuf(pBuilder, row); } SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx]]; if (bind->num != rowNum) { return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); } param.schema = pColSchema; insGetSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, ¶m.toffset, ¶m.colIdx); if (bind->is_null && bind->is_null[r]) { if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL"); } CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, ¶m)); } else { if (bind->buffer_type != pColSchema->type) { return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); } int32_t colLen = pColSchema->bytes; if (IS_VAR_DATA_TYPE(pColSchema->type)) { colLen = bind->length[r]; } CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, ¶m)); } if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { TSKEY tsKey = TD_ROW_KEY(row); insCheckTimestamp(pDataBlock, (const char*)&tsKey); } // set the null value for the columns that do not assign values if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { pBuilder->hasNone = true; } if (rowEnd) { tdSRowEnd(pBuilder); } #ifdef TD_DEBUG_PRINT_ROW if (rowEnd) { STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1); tdSRowPrint(row, pSTSchema, __func__); taosMemoryFree(pSTSchema); } #endif } if (rowEnd) { pDataBlock->size += extendedRowSize * bind->num; SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); CHECK_CODE(insSetBlockInfo(pBlocks, pDataBlock, bind->num, &pBuf)); } return TSDB_CODE_SUCCESS; } int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields, uint8_t timePrec) { if (fields) { *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD)); if (NULL == *fields) { return TSDB_CODE_OUT_OF_MEMORY; } SSchema* schema = &pSchema[boundInfo->boundColumns[0]]; if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) { (*fields)[0].precision = timePrec; } for (int32_t i = 0; i < boundInfo->numOfBound; ++i) { schema = &pSchema[boundInfo->boundColumns[i]]; strcpy((*fields)[i].name, schema->name); (*fields)[i].type = schema->type; (*fields)[i].bytes = schema->bytes; } } *fieldNum = boundInfo->numOfBound; return TSDB_CODE_SUCCESS; } int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) { STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; if (NULL == tags) { return TSDB_CODE_QRY_APP_ERROR; } if (pDataBlock->pTableMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pTableMeta->tableType != TSDB_CHILD_TABLE) { return TSDB_CODE_TSC_STMT_API_ERROR; } SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); if (tags->numOfBound <= 0) { *fieldNum = 0; *fields = NULL; return TSDB_CODE_SUCCESS; } CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields, 0)); return TSDB_CODE_SUCCESS; } int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) { STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); if (pDataBlock->boundColumnInfo.numOfBound <= 0) { *fieldNum = 0; if (fields) { *fields = NULL; } return TSDB_CODE_SUCCESS; } CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields, pDataBlock->pTableMeta->tableInfo.precision)); return TSDB_CODE_SUCCESS; } int32_t qResetStmtDataBlock(void* block, bool keepBuf) { STableDataBlocks* pBlock = (STableDataBlocks*)block; if (keepBuf) { taosMemoryFreeClear(pBlock->pData); pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE); if (NULL == pBlock->pData) { return TSDB_CODE_OUT_OF_MEMORY; } memset(pBlock->pData, 0, sizeof(SSubmitBlk)); } else { pBlock->pData = NULL; } pBlock->ordered = true; pBlock->prevTS = INT64_MIN; pBlock->size = sizeof(SSubmitBlk); pBlock->tsSource = -1; pBlock->numOfTables = 1; pBlock->nAllocSize = TSDB_PAYLOAD_SIZE; pBlock->headerSize = pBlock->size; pBlock->createTbReqLen = 0; memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder)); return TSDB_CODE_SUCCESS; } int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) { *pDst = taosMemoryMalloc(sizeof(STableDataBlocks)); if (NULL == *pDst) { return TSDB_CODE_OUT_OF_MEMORY; } memcpy(*pDst, pSrc, sizeof(STableDataBlocks)); ((STableDataBlocks*)(*pDst))->cloned = true; STableDataBlocks* pBlock = (STableDataBlocks*)(*pDst); if (pBlock->pTableMeta) { void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pBlock->pTableMeta)); if (NULL == pNewMeta) { taosMemoryFreeClear(*pDst); return TSDB_CODE_OUT_OF_MEMORY; } memcpy(pNewMeta, pBlock->pTableMeta, TABLE_META_SIZE(pBlock->pTableMeta)); pBlock->pTableMeta = pNewMeta; } return qResetStmtDataBlock(*pDst, false); } int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) { int32_t code = qCloneStmtDataBlock(pDst, pSrc); if (code) { return code; } STableDataBlocks* pBlock = (STableDataBlocks*)*pDst; pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize); if (NULL == pBlock->pData) { qFreeStmtDataBlock(pBlock); return TSDB_CODE_OUT_OF_MEMORY; } pBlock->vgId = vgId; if (pBlock->pTableMeta) { pBlock->pTableMeta->uid = uid; pBlock->pTableMeta->vgId = vgId; } memset(pBlock->pData, 0, sizeof(SSubmitBlk)); return TSDB_CODE_SUCCESS; } STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataBlocks*)pDataBlock)->pTableMeta; } void qFreeStmtDataBlock(void* pDataBlock) { if (pDataBlock == NULL) { return; } taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta); taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData); taosMemoryFreeClear(pDataBlock); } void qDestroyStmtDataBlock(void* pBlock) { if (pBlock == NULL) { return; } STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; pDataBlock->cloned = false; insDestroyDataBlock(pDataBlock); }