diff --git a/include/common/trow.h b/include/common/trow.h index c5293fa45fde603c74da3416db4108e3c2f196aa..9d183c8f80d926c9f646b83f73a0cbadacbd5afe 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -169,7 +169,7 @@ typedef struct { // #define TD_ROW_VER(r) ((r)->ver) #define TD_ROW_KEY_ADDR(r) (r) -// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and +// N.B. If without STSchema, insGetExtendedRowSize() is used to get the rowMaxBytes and // (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined. #define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) ((s)->tlen + TD_ROW_HEAD_LEN) diff --git a/source/libs/parser/inc/parInsertData.h b/source/libs/parser/inc/parInsertData.h deleted file mode 100644 index 6d0ba29eb71357b82371566f3e5024e2fcaff239..0000000000000000000000000000000000000000 --- a/source/libs/parser/inc/parInsertData.h +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_DATABLOCKMGT_H -#define TDENGINE_DATABLOCKMGT_H - -#include "catalog.h" -#include "os.h" -#include "query.h" -#include "tname.h" -#include "ttypes.h" - -#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED) - -typedef enum EOrderStatus { - ORDER_STATUS_UNKNOWN = 0, - ORDER_STATUS_ORDERED = 1, - ORDER_STATUS_DISORDERED = 2, -} EOrderStatus; - -typedef enum EValStat { - VAL_STAT_HAS = 0x0, // 0 means has val - VAL_STAT_NONE = 0x01, // 1 means no val -} EValStat; - -typedef struct SBoundColumn { - int32_t offset; // all column offset value - int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future - uint8_t valStat; // EValStat. denote if current column bound or not(0 means has val, 1 means no val) -} SBoundColumn; - -typedef struct { - col_id_t schemaColIdx; - col_id_t boundIdx; - col_id_t finalIdx; -} SBoundIdxInfo; - -typedef struct SParsedDataColInfo { - col_id_t numOfCols; - col_id_t numOfBound; - uint16_t flen; // TODO: get from STSchema - uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow) - uint16_t extendedVarLen; - uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part) - col_id_t *boundColumns; // bound column idx according to schema - SBoundColumn *cols; - SBoundIdxInfo *colIdxInfo; - int8_t orderStatus; // bound columns -} SParsedDataColInfo; - -typedef struct { - uint8_t rowType; // default is 0, that is SDataRow - int32_t rowSize; -} SMemRowBuilder; - -typedef struct STableDataBlocks { - int8_t tsSource; // where does the UNIX timestamp come from, server or client - bool ordered; // if current rows are ordered or not - int32_t vgId; // virtual group id - int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending - int32_t numOfTables; // number of tables in current submit block - int32_t rowSize; // row size for current table - uint32_t nAllocSize; - uint32_t headerSize; // header for table info (uid, tid, submit metadata) - uint32_t size; - STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to - // avoid to be removed from cache - char *pData; - bool cloned; - int32_t createTbReqLen; - SParsedDataColInfo boundColumnInfo; - SRowBuilder rowBuilder; -} STableDataBlocks; - -static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) { - STableComInfo *pTableInfo = &pBlock->pTableMeta->tableInfo; - ASSERT(pBlock->rowSize == pTableInfo->rowSize); - return pBlock->rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + pBlock->boundColumnInfo.extendedVarLen + - (int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1); -} - -static FORCE_INLINE void getSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx, int32_t *toffset, - col_id_t *colIdx) { - col_id_t schemaIdx = 0; - if (IS_DATA_COL_ORDERED(spd)) { - schemaIdx = spd->boundColumns[idx]; - if (TD_IS_TP_ROW_T(rowType)) { - *toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart - *colIdx = schemaIdx; - } else { - *toffset = idx * sizeof(SKvRowIdx); // the offset of SKvRowIdx - *colIdx = idx; - } - } else { - ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx); - schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx; - if (TD_IS_TP_ROW_T(rowType)) { - *toffset = (spd->cols + schemaIdx)->toffset; - *colIdx = schemaIdx; - } else { - *toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SKvRowIdx); - *colIdx = (spd->colIdxInfo + idx)->finalIdx; - } - } -} - -static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *dataBuf, int32_t numOfRows) { - pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? 0 : dataBuf->pTableMeta->suid); - pBlocks->uid = dataBuf->pTableMeta->uid; - pBlocks->sversion = dataBuf->pTableMeta->sversion; - pBlocks->schemaLen = dataBuf->createTbReqLen; - - if (pBlocks->numOfRows + numOfRows >= INT32_MAX) { - return TSDB_CODE_TSC_INVALID_OPERATION; - } else { - pBlocks->numOfRows += numOfRows; - return TSDB_CODE_SUCCESS; - } -} - -int32_t schemaIdxCompar(const void *lhs, const void *rhs); -int32_t boundIdxCompar(const void *lhs, const void *rhs); -void setBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols); -void destroyBlockArrayList(SArray *pDataBlockList); -void destroyBlockHashmap(SHashObj *pDataBlockHash); -int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo); -int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t *numOfRows); -int32_t getDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset, - int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks, SArray *pBlockList, - SVCreateTbReq *pCreateTbReq); -int32_t mergeTableDataBlocks(SHashObj *pHashObj, uint8_t payloadType, SArray **pVgDataBlocks); -int32_t buildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq); - -int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize); - -#endif // TDENGINE_DATABLOCKMGT_H diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h new file mode 100644 index 0000000000000000000000000000000000000000..78e87d847ca52de74ac1aa7d5a0e671103364149 --- /dev/null +++ b/source/libs/parser/inc/parInsertUtil.h @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_PAR_INSERT_UTIL_H +#define TDENGINE_PAR_INSERT_UTIL_H + +#include "parUtil.h" + +struct SToken; + +#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED) + +#define NEXT_TOKEN(pSql, sToken) \ + do { \ + int32_t index = 0; \ + sToken = tStrGetToken(pSql, &index, false); \ + pSql += index; \ + } while (0) + +#define CHECK_CODE(expr) \ + do { \ + int32_t code = expr; \ + if (TSDB_CODE_SUCCESS != code) { \ + return code; \ + } \ + } while (0) + +typedef enum EOrderStatus { + ORDER_STATUS_UNKNOWN = 0, + ORDER_STATUS_ORDERED = 1, + ORDER_STATUS_DISORDERED = 2, +} EOrderStatus; + +typedef enum EValStat { + VAL_STAT_HAS = 0x0, // 0 means has val + VAL_STAT_NONE = 0x01, // 1 means no val +} EValStat; + +typedef struct SBoundColumn { + int32_t offset; // all column offset value + int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future + uint8_t valStat; // EValStat. denote if current column bound or not(0 means has val, 1 means no val) +} SBoundColumn; + +typedef struct { + col_id_t schemaColIdx; + col_id_t boundIdx; + col_id_t finalIdx; +} SBoundIdxInfo; + +typedef struct SParsedDataColInfo { + col_id_t numOfCols; + col_id_t numOfBound; + uint16_t flen; // TODO: get from STSchema + uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow) + uint16_t extendedVarLen; + uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part) + col_id_t *boundColumns; // bound column idx according to schema + SBoundColumn *cols; + SBoundIdxInfo *colIdxInfo; + int8_t orderStatus; // bound columns +} SParsedDataColInfo; + +typedef struct SInsertParseBaseContext { + SParseContext *pComCxt; + char *pSql; + SMsgBuf msg; +} SInsertParseBaseContext; + +typedef struct SInsertParseContext { + SParseContext *pComCxt; // input + char *pSql; // input + SMsgBuf msg; // input + STableMeta *pTableMeta; // each table + SParsedDataColInfo tags; // each table + SVCreateTbReq createTblReq; // each table + SHashObj *pVgroupsHashObj; // global + SHashObj *pTableBlockHashObj; // global + SHashObj *pSubTableHashObj; // global + SArray *pVgDataBlocks; // global + SHashObj *pTableNameHashObj; // global + SHashObj *pDbFNameHashObj; // global + int32_t totalNum; + SVnodeModifOpStmt *pOutput; + SStmtCallback *pStmtCb; + SParseMetaCache *pMetaCache; + char sTableName[TSDB_TABLE_NAME_LEN]; + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW]; + int64_t memElapsed; + int64_t parRowElapsed; +} SInsertParseContext; + +typedef struct SInsertParseSyntaxCxt { + SParseContext *pComCxt; + char *pSql; + SMsgBuf msg; + SParseMetaCache *pMetaCache; +} SInsertParseSyntaxCxt; + +typedef struct SMemParam { + SRowBuilder *rb; + SSchema *schema; + int32_t toffset; + col_id_t colIdx; +} SMemParam; + +typedef struct { + uint8_t rowType; // default is 0, that is SDataRow + int32_t rowSize; +} SMemRowBuilder; + +typedef struct STableDataBlocks { + int8_t tsSource; // where does the UNIX timestamp come from, server or client + bool ordered; // if current rows are ordered or not + int32_t vgId; // virtual group id + int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending + int32_t numOfTables; // number of tables in current submit block + int32_t rowSize; // row size for current table + uint32_t nAllocSize; + uint32_t headerSize; // header for table info (uid, tid, submit metadata) + uint32_t size; + STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to + // avoid to be removed from cache + char *pData; + bool cloned; + int32_t createTbReqLen; + SParsedDataColInfo boundColumnInfo; + SRowBuilder rowBuilder; +} STableDataBlocks; + +int32_t insGetExtendedRowSize(STableDataBlocks *pBlock); +void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx, int32_t *toffset, col_id_t *colIdx); +int32_t insSetBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *dataBuf, int32_t numOfRows); +int32_t insSchemaIdxCompar(const void *lhs, const void *rhs); +int32_t insBoundIdxCompar(const void *lhs, const void *rhs); +void insSetBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols); +void insDestroyBlockArrayList(SArray *pDataBlockList); +void insDestroyBlockHashmap(SHashObj *pDataBlockHash); +int32_t insInitRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo); +int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset, + int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks, + SArray *pBlockList, SVCreateTbReq *pCreateTbReq); +int32_t insMergeTableDataBlocks(SHashObj *pHashObj, uint8_t payloadType, SArray **pVgDataBlocks); +int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq); +int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize); +int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf); +int32_t insFindCol(struct SToken *pColname, int32_t start, int32_t end, SSchema *pSchema); +void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname, + SArray *tagName, uint8_t tagNum); +int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param); +int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start); +int32_t insBuildOutput(SInsertParseContext *pCxt); +void insDestroyDataBlock(STableDataBlocks *pDataBlock); + +#endif // TDENGINE_PAR_INSERT_UTIL_H diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c new file mode 100644 index 0000000000000000000000000000000000000000..8be5b86b37bd51a0ae1eb1f7c66584b0c8366c55 --- /dev/null +++ b/source/libs/parser/src/parInsertSml.c @@ -0,0 +1,363 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "parInsertUtil.h" +#include "parInt.h" +#include "parToken.h" +#include "ttime.h" + +int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, + int32_t msgBufLen) { + SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen}; + SToken sToken; + int32_t code = 0; + char* tbName = NULL; + + NEXT_TOKEN(pTableName, sToken); + + if (sToken.n == 0) { + return buildInvalidOperationMsg(&msg, "empty table name"); + } + + code = insCreateSName(pName, &sToken, acctId, dbName, &msg); + if (code) { + return code; + } + + NEXT_TOKEN(pTableName, sToken); + + if (sToken.n > 0) { + return buildInvalidOperationMsg(&msg, "table name format is wrong"); + } + + return TSDB_CODE_SUCCESS; +} + +typedef struct SmlExecTableHandle { + SParsedDataColInfo tags; // each table + SVCreateTbReq createTblReq; // each table +} SmlExecTableHandle; + +typedef struct SmlExecHandle { + SHashObj* pBlockHash; + SmlExecTableHandle tableExecHandle; + SQuery* pQuery; +} SSmlExecHandle; + +static void smlDestroyTableHandle(void* pHandle) { + SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle; + destroyBoundColumnInfo(&handle->tags); + tdDestroySVCreateTbReq(&handle->createTblReq); +} + +static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema, bool isTag) { + col_id_t nCols = pColList->numOfCols; + + pColList->numOfBound = 0; + pColList->boundNullLen = 0; + memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols); + for (col_id_t i = 0; i < nCols; ++i) { + pColList->cols[i].valStat = VAL_STAT_NONE; + } + + bool isOrdered = true; + col_id_t lastColIdx = -1; // last column found + for (int i = 0; i < taosArrayGetSize(cols); ++i) { + SSmlKv* kv = taosArrayGetP(cols, i); + SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key}; + col_id_t t = lastColIdx + 1; + col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, nCols, pSchema)); + uDebug("SML, index:%d, t:%d, ncols:%d", index, t, nCols); + if (index < 0 && t > 0) { + index = insFindCol(&sToken, 0, t, pSchema); + isOrdered = false; + } + if (index < 0) { + uError("smlBoundColumnData. index:%d", index); + return TSDB_CODE_SML_INVALID_DATA; + } + if (pColList->cols[index].valStat == VAL_STAT_HAS) { + uError("smlBoundColumnData. already set. index:%d", index); + return TSDB_CODE_SML_INVALID_DATA; + } + lastColIdx = index; + pColList->cols[index].valStat = VAL_STAT_HAS; + pColList->boundColumns[pColList->numOfBound] = index; + ++pColList->numOfBound; + switch (pSchema[t].type) { + case TSDB_DATA_TYPE_BINARY: + pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES); + break; + case TSDB_DATA_TYPE_NCHAR: + pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE); + break; + default: + pColList->boundNullLen += TYPE_BYTES[pSchema[t].type]; + break; + } + } + + pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED; + + if (!isOrdered) { + pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo)); + if (NULL == pColList->colIdxInfo) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + SBoundIdxInfo* pColIdx = pColList->colIdxInfo; + for (col_id_t i = 0; i < pColList->numOfBound; ++i) { + pColIdx[i].schemaColIdx = pColList->boundColumns[i]; + pColIdx[i].boundIdx = i; + } + taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insSchemaIdxCompar); + for (col_id_t i = 0; i < pColList->numOfBound; ++i) { + pColIdx[i].finalIdx = i; + } + taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insBoundIdxCompar); + } + + if (pColList->numOfCols > pColList->numOfBound) { + memset(&pColList->boundColumns[pColList->numOfBound], 0, + sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound)); + } + + return TSDB_CODE_SUCCESS; +} + +/** + * @brief No json tag for schemaless + * + * @param cols + * @param tags + * @param pSchema + * @param ppTag + * @param msg + * @return int32_t + */ +static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName, + SMsgBuf* msg) { + SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal)); + if (!pTagArray) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + *tagName = taosArrayInit(8, TSDB_COL_NAME_LEN); + if (!*tagName) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; + for (int i = 0; i < tags->numOfBound; ++i) { + SSchema* pTagSchema = &pSchema[tags->boundColumns[i]]; + SSmlKv* kv = taosArrayGetP(cols, i); + + taosArrayPush(*tagName, pTagSchema->name); + STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; + // strcpy(val.colName, pTagSchema->name); + if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) { + val.pData = (uint8_t*)kv->value; + val.nData = kv->length; + } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) { + int32_t output = 0; + void* p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE); + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), kv->length * TSDB_NCHAR_SIZE, &output)) { + if (errno == E2BIG) { + taosMemoryFree(p); + code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name); + goto end; + } + char buf[512] = {0}; + snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno)); + taosMemoryFree(p); + code = buildSyntaxErrMsg(msg, buf, kv->value); + goto end; + } + val.pData = p; + val.nData = output; + } else { + memcpy(&val.i64, &(kv->value), kv->length); + } + taosArrayPush(pTagArray, &val); + } + + code = tTagNew(pTagArray, 1, false, ppTag); +end: + for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { + STagVal* p = (STagVal*)taosArrayGet(pTagArray, i); + if (p->type == TSDB_DATA_TYPE_NCHAR) { + taosMemoryFree(p->pData); + } + } + taosArrayDestroy(pTagArray); + return code; +} + +int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, + char* tableName, const char* sTableName, int32_t sTableNameLen, char* msgBuf, int16_t msgBufLen) { + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + + SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; + smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table + SSchema* pTagsSchema = getTableTagSchema(pTableMeta); + insSetBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta)); + int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, true); + if (ret != TSDB_CODE_SUCCESS) { + buildInvalidOperationMsg(&pBuf, "bound tags error"); + return ret; + } + STag* pTag = NULL; + SArray* tagName = NULL; + ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, &pTag, &tagName, &pBuf); + if (ret != TSDB_CODE_SUCCESS) { + taosArrayDestroy(tagName); + return ret; + } + + insBuildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, + pTableMeta->tableInfo.numOfTags); + taosArrayDestroy(tagName); + + smlHandle->tableExecHandle.createTblReq.ctb.stbName = taosMemoryMalloc(sTableNameLen + 1); + memcpy(smlHandle->tableExecHandle.createTblReq.ctb.stbName, sTableName, sTableNameLen); + smlHandle->tableExecHandle.createTblReq.ctb.stbName[sTableNameLen] = 0; + + STableDataBlocks* pDataBlock = NULL; + ret = insGetDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), + TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, + pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq); + if (ret != TSDB_CODE_SUCCESS) { + buildInvalidOperationMsg(&pBuf, "create data block error"); + return ret; + } + + SSchema* pSchema = getTableColumnSchema(pTableMeta); + + ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema, false); + if (ret != TSDB_CODE_SUCCESS) { + buildInvalidOperationMsg(&pBuf, "bound cols error"); + return ret; + } + int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock); + SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; + SRowBuilder* pBuilder = &pDataBlock->rowBuilder; + SMemParam param = {.rb = pBuilder}; + + insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo); + + int32_t rowNum = taosArrayGetSize(cols); + if (rowNum <= 0) { + return buildInvalidOperationMsg(&pBuf, "cols size <= 0"); + } + ret = insAllocateMemForSize(pDataBlock, extendedRowSize * rowNum); + if (ret != TSDB_CODE_SUCCESS) { + buildInvalidOperationMsg(&pBuf, "allocate memory error"); + return ret; + } + for (int32_t r = 0; r < rowNum; ++r) { + STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header + tdSRowResetBuf(pBuilder, row); + void* rowData = taosArrayGetP(cols, r); + size_t rowDataSize = 0; + if (format) { + rowDataSize = taosArrayGetSize(rowData); + } + + // 1. set the parsed value from sql string + for (int c = 0, j = 0; c < spd->numOfBound; ++c) { + SSchema* pColSchema = &pSchema[spd->boundColumns[c]]; + + param.schema = pColSchema; + insGetSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); + + SSmlKv* kv = NULL; + if (format) { + if (j < rowDataSize) { + kv = taosArrayGetP(rowData, j); + if (rowDataSize != spd->numOfBound && j != 0 && + (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) { + kv = NULL; + } else { + j++; + } + } + } else { + void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name)); + if (p) kv = *p; + } + + if (kv) { + int32_t colLen = kv->length; + if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { + // uError("SML:data before:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision); + kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); + // uError("SML:data after:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision); + } + + if (IS_VAR_DATA_TYPE(kv->type)) { + insMemRowAppend(&pBuf, kv->value, colLen, ¶m); + } else { + insMemRowAppend(&pBuf, &(kv->value), colLen, ¶m); + } + } else { + pBuilder->hasNone = true; + } + + if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { + TSKEY tsKey = TD_ROW_KEY(row); + insCheckTimestamp(pDataBlock, (const char*)&tsKey); + } + } + + // set the null value for the columns that do not assign values + if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { + pBuilder->hasNone = true; + } + + tdSRowEnd(pBuilder); + pDataBlock->size += extendedRowSize; + } + + SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); + if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, rowNum)) { + return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX"); + } + + return TSDB_CODE_SUCCESS; +} + +void* smlInitHandle(SQuery* pQuery) { + SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle)); + if (!handle) return NULL; + handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + handle->pQuery = pQuery; + + return handle; +} + +void smlDestroyHandle(void* pHandle) { + if (!pHandle) return; + SSmlExecHandle* handle = (SSmlExecHandle*)pHandle; + insDestroyBlockHashmap(handle->pBlockHash); + smlDestroyTableHandle(&handle->tableExecHandle); + taosMemoryFree(handle); +} + +int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) { + SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; + return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash); +} diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsertSql.c similarity index 60% rename from source/libs/parser/src/parInsert.c rename to source/libs/parser/src/parInsertSql.c index 65cc44b1e9f403c8abe0cae4f9106999447d8b97..84c9664fa91c729fd96ceb8df1bb37bb561cf37c 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsertSql.c @@ -13,22 +13,10 @@ * along with this program. If not, see . */ -#include "os.h" -#include "parInsertData.h" -#include "parInt.h" +#include "parInsertUtil.h" #include "parToken.h" -#include "parUtil.h" -#include "query.h" #include "tglobal.h" #include "ttime.h" -#include "ttypes.h" - -#define NEXT_TOKEN(pSql, sToken) \ - do { \ - int32_t index = 0; \ - sToken = tStrGetToken(pSql, &index, false); \ - pSql += index; \ - } while (0) #define NEXT_TOKEN_WITH_PREV(pSql, sToken) \ do { \ @@ -49,69 +37,11 @@ pSql += sToken.n; \ } while (TK_NK_SPACE == sToken.type) -typedef struct SInsertParseBaseContext { - SParseContext* pComCxt; - char* pSql; - SMsgBuf msg; -} SInsertParseBaseContext; - -typedef struct SInsertParseContext { - SParseContext* pComCxt; // input - char* pSql; // input - SMsgBuf msg; // input - STableMeta* pTableMeta; // each table - SParsedDataColInfo tags; // each table - SVCreateTbReq createTblReq; // each table - SHashObj* pVgroupsHashObj; // global - SHashObj* pTableBlockHashObj; // global - SHashObj* pSubTableHashObj; // global - SArray* pVgDataBlocks; // global - SHashObj* pTableNameHashObj; // global - SHashObj* pDbFNameHashObj; // global - int32_t totalNum; - SVnodeModifOpStmt* pOutput; - SStmtCallback* pStmtCb; - SParseMetaCache* pMetaCache; - char sTableName[TSDB_TABLE_NAME_LEN]; - char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW]; - int64_t memElapsed; - int64_t parRowElapsed; -} SInsertParseContext; - -typedef struct SInsertParseSyntaxCxt { - SParseContext* pComCxt; - char* pSql; - SMsgBuf msg; - SParseMetaCache* pMetaCache; -} SInsertParseSyntaxCxt; - typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE; static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE; -typedef struct SKvParam { - int16_t pos; - SArray* pTagVals; - SSchema* schema; - char buf[TSDB_MAX_TAGS_LEN]; -} SKvParam; - -typedef struct SMemParam { - SRowBuilder* rb; - SSchema* schema; - int32_t toffset; - col_id_t colIdx; -} SMemParam; - -#define CHECK_CODE(expr) \ - do { \ - int32_t code = expr; \ - if (TSDB_CODE_SUCCESS != code) { \ - return code; \ - } \ - } while (0) - static int32_t skipInsertInto(char** pSql, SMsgBuf* pMsg) { SToken sToken; NEXT_TOKEN(*pSql, sToken); @@ -125,105 +55,6 @@ static int32_t skipInsertInto(char** pSql, SMsgBuf* pMsg) { return TSDB_CODE_SUCCESS; } -static char* tableNameGetPosition(SToken* pToken, char target) { - bool inEscape = false; - bool inQuote = false; - char quotaStr = 0; - - for (uint32_t i = 0; i < pToken->n; ++i) { - if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) { - return pToken->z + i; - } - - if (*(pToken->z + i) == TS_ESCAPE_CHAR) { - if (!inQuote) { - inEscape = !inEscape; - } - } - - if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') { - if (!inEscape) { - if (!inQuote) { - quotaStr = *(pToken->z + i); - inQuote = !inQuote; - } else if (quotaStr == *(pToken->z + i)) { - inQuote = !inQuote; - } - } - } - } - - return NULL; -} - -static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) { - const char* msg1 = "name too long"; - const char* msg2 = "invalid database name"; - const char* msg3 = "db is not specified"; - const char* msg4 = "invalid table name"; - - int32_t code = TSDB_CODE_SUCCESS; - char* p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]); - - if (p != NULL) { // db has been specified in sql string so we ignore current db path - assert(*p == TS_PATH_DELIMITER[0]); - - int32_t dbLen = p - pTableName->z; - if (dbLen <= 0) { - return buildInvalidOperationMsg(pMsgBuf, msg2); - } - char name[TSDB_DB_FNAME_LEN] = {0}; - strncpy(name, pTableName->z, dbLen); - int32_t actualDbLen = strdequote(name); - - code = tNameSetDbName(pName, acctId, name, actualDbLen); - if (code != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); - } - - int32_t tbLen = pTableName->n - dbLen - 1; - if (tbLen <= 0) { - return buildInvalidOperationMsg(pMsgBuf, msg4); - } - - char tbname[TSDB_TABLE_FNAME_LEN] = {0}; - strncpy(tbname, p + 1, tbLen); - /*tbLen = */ strdequote(tbname); - - code = tNameFromString(pName, tbname, T_NAME_TABLE); - if (code != 0) { - return buildInvalidOperationMsg(pMsgBuf, msg1); - } - } else { // get current DB name first, and then set it into path - if (pTableName->n >= TSDB_TABLE_NAME_LEN) { - return buildInvalidOperationMsg(pMsgBuf, msg1); - } - - assert(pTableName->n < TSDB_TABLE_FNAME_LEN); - - char name[TSDB_TABLE_FNAME_LEN] = {0}; - strncpy(name, pTableName->z, pTableName->n); - strdequote(name); - - if (dbName == NULL) { - return buildInvalidOperationMsg(pMsgBuf, msg3); - } - - code = tNameSetDbName(pName, acctId, dbName, strlen(dbName)); - if (code != TSDB_CODE_SUCCESS) { - code = buildInvalidOperationMsg(pMsgBuf, msg2); - return code; - } - - code = tNameFromString(pName, name, T_NAME_TABLE); - if (code != 0) { - code = buildInvalidOperationMsg(pMsgBuf, msg1); - } - } - - return code; -} - static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) { SParseContext* pBasicCtx = pCxt->pComCxt; if (pBasicCtx->async) { @@ -298,74 +129,6 @@ static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgI return TSDB_CODE_SUCCESS; } -static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) { - while (start < end) { - if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) { - return start; - } - ++start; - } - return -1; -} - -static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) { - SSubmitReq* submit = (SSubmitReq*)blocks->pData; - submit->header.vgId = htonl(blocks->vg.vgId); - submit->header.contLen = htonl(blocks->size); - submit->length = submit->header.contLen; - submit->numOfBlocks = htonl(blocks->numOfTables); - SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); - int32_t numOfBlocks = blocks->numOfTables; - while (numOfBlocks--) { - int32_t dataLen = blk->dataLen; - int32_t schemaLen = blk->schemaLen; - blk->uid = htobe64(blk->uid); - blk->suid = htobe64(blk->suid); - blk->sversion = htonl(blk->sversion); - blk->dataLen = htonl(blk->dataLen); - blk->schemaLen = htonl(blk->schemaLen); - blk->numOfRows = htonl(blk->numOfRows); - blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen); - } -} - -static int32_t buildOutput(SInsertParseContext* pCxt) { - size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks); - pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); - if (NULL == pCxt->pOutput->pDataBlocks) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - for (size_t i = 0; i < numOfVg; ++i) { - STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i); - SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); - if (NULL == dst) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); - dst->numOfTables = src->numOfTables; - dst->size = src->size; - TSWAP(dst->pData, src->pData); - buildMsgHeader(src, dst); - taosArrayPush(pCxt->pOutput->pDataBlocks, &dst); - } - return TSDB_CODE_SUCCESS; -} - -int32_t checkTimestamp(STableDataBlocks* pDataBlocks, const char* start) { - // once the data block is disordered, we do NOT keep previous timestamp any more - if (!pDataBlocks->ordered) { - return TSDB_CODE_SUCCESS; - } - - TSKEY k = *(TSKEY*)start; - if (k <= pDataBlocks->prevTS) { - pDataBlocks->ordered = false; - } - - pDataBlocks->prevTS = k; - return TSDB_CODE_SUCCESS; -} - static int parseTime(char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) { int32_t index = 0; SToken sToken; @@ -659,40 +422,6 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int return TSDB_CODE_FAILED; } -static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) { - SMemParam* pa = (SMemParam*)param; - SRowBuilder* rb = pa->rb; - - if (value == NULL) { // it is a null data - tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx); - return TSDB_CODE_SUCCESS; - } - - if (TSDB_DATA_TYPE_BINARY == pa->schema->type) { - const char* rowEnd = tdRowEnd(rb->pBuf); - STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len); - tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx); - } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) { - // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' - int32_t output = 0; - const char* rowEnd = tdRowEnd(rb->pBuf); - if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { - if (errno == E2BIG) { - return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name); - } - char buf[512] = {0}; - snprintf(buf, tListLen(buf), "%s", strerror(errno)); - return buildSyntaxErrMsg(pMsgBuf, buf, value); - } - varDataSetLen(rowEnd, output); - tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx); - } else { - tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx); - } - - return TSDB_CODE_SUCCESS; -} - // pSql -> tag1_name, ...) static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) { col_id_t nCols = pColList->numOfCols; @@ -720,9 +449,9 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* sToken.n = strdequote(sToken.z); col_id_t t = lastColIdx + 1; - col_id_t index = findCol(&sToken, t, nCols, pSchema); + col_id_t index = insFindCol(&sToken, t, nCols, pSchema); if (index < 0 && t > 0) { - index = findCol(&sToken, 0, t, pSchema); + index = insFindCol(&sToken, 0, t, pSchema); isOrdered = false; } if (index < 0) { @@ -760,11 +489,11 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColIdx[i].schemaColIdx = pColList->boundColumns[i]; pColIdx[i].boundIdx = i; } - taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar); + taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insSchemaIdxCompar); for (col_id_t i = 0; i < pColList->numOfBound; ++i) { pColIdx[i].finalIdx = i; } - taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar); + taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insBoundIdxCompar); } if (pColList->numOfCols > pColList->numOfBound) { @@ -775,21 +504,6 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* return TSDB_CODE_SUCCESS; } -static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, - SArray* tagName, uint8_t tagNum) { - pTbReq->type = TD_CHILD_TABLE; - pTbReq->name = strdup(tname); - pTbReq->ctb.suid = suid; - pTbReq->ctb.tagNum = tagNum; - if (sname) pTbReq->ctb.stbName = strdup(sname); - pTbReq->ctb.pTag = (uint8_t*)pTag; - pTbReq->ctb.tagName = taosArrayDup(tagName); - pTbReq->ttl = TSDB_DEFAULT_TABLE_TTL; - pTbReq->commentLen = -1; - - return; -} - static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, STagVal* val, SMsgBuf* pMsgBuf) { int64_t iv; @@ -1037,8 +751,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint } if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { - buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, - pCxt->pTableMeta->tableInfo.numOfTags); + insBuildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, + pCxt->pTableMeta->tableInfo.numOfTags); pTag = NULL; } @@ -1160,7 +874,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, int32_t tbNo, SName* NEXT_TOKEN(pCxt->pSql, sToken); SName sname; - CHECK_CODE(createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); + CHECK_CODE(insCreateSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(&sname, dbFName); strcpy(pCxt->sTableName, sname.tname); @@ -1172,7 +886,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, int32_t tbNo, SName* CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, tbNo, name, tbFName, len, pCxt->pTableMeta)); SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta); - setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); + insSetBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...) NEXT_TOKEN(pCxt->pSql, sToken); @@ -1235,8 +949,9 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, } param.schema = pSchema; - getSTSRowAppendInfo(pBuilder->rowType, spd, i, ¶m.toffset, ¶m.colIdx); - CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, ¶m, &pCxt->msg)); + insGetSTSRowAppendInfo(pBuilder->rowType, spd, i, ¶m.toffset, ¶m.colIdx); + CHECK_CODE( + parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, insMemRowAppend, ¶m, &pCxt->msg)); if (i < spd->numOfBound - 1) { NEXT_VALID_TOKEN(pCxt->pSql, sToken); @@ -1247,7 +962,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, } TSKEY tsKey = TD_ROW_KEY(row); - checkTimestamp(pDataBlocks, (const char*)&tsKey); + insCheckTimestamp(pDataBlocks, (const char*)&tsKey); if (!isParseBindParam) { // set the null value for the columns that do not assign values @@ -1270,11 +985,40 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, return TSDB_CODE_SUCCESS; } +static int32_t allocateMemIfNeed(STableDataBlocks* pDataBlock, int32_t rowSize, int32_t* numOfRows) { + size_t remain = pDataBlock->nAllocSize - pDataBlock->size; + const int factor = 5; + uint32_t nAllocSizeOld = pDataBlock->nAllocSize; + + // expand the allocated size + if (remain < rowSize * factor) { + while (remain < rowSize * factor) { + pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5); + remain = pDataBlock->nAllocSize - pDataBlock->size; + } + + char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize); + if (tmp != NULL) { + pDataBlock->pData = tmp; + memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size); + } else { + // do nothing, if allocate more memory failed + pDataBlock->nAllocSize = nAllocSizeOld; + *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize; + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize; + return TSDB_CODE_SUCCESS; +} + // pSql -> (field1_value, ...) [(field1_value2, ...) ...] static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) { STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta); - int32_t extendedRowSize = getExtendedRowSize(pDataBlock); - CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); + int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock); + CHECK_CODE( + insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); (*numOfRows) = 0; // char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" @@ -1320,13 +1064,13 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) { int32_t maxNumOfRows; - CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows)); + CHECK_CODE(allocateMemIfNeed(dataBuf, insGetExtendedRowSize(dataBuf), &maxNumOfRows)); int32_t numOfRows = 0; CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows)); SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData); - if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) { + if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, dataBuf, numOfRows)) { return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX"); } @@ -1339,8 +1083,9 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da static int32_t parseCsvFile(SInsertParseContext* pCxt, TdFilePtr fp, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) { STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta); - int32_t extendedRowSize = getExtendedRowSize(pDataBlock); - CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); + int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock); + CHECK_CODE( + insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); (*numOfRows) = 0; char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" @@ -1391,13 +1136,13 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, TdFilePtr fp, STableDataB static int32_t parseDataFromFileAgain(SInsertParseContext* pCxt, int16_t tableNo, const SName* pTableName, STableDataBlocks* dataBuf) { int32_t maxNumOfRows; - CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows)); + CHECK_CODE(allocateMemIfNeed(dataBuf, insGetExtendedRowSize(dataBuf), &maxNumOfRows)); int32_t numOfRows = 0; CHECK_CODE(parseCsvFile(pCxt, pCxt->pComCxt->csvCxt.fp, dataBuf, maxNumOfRows, &numOfRows)); SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData); - if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) { + if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, dataBuf, numOfRows)) { return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX"); } @@ -1448,13 +1193,13 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { taosHashCleanup(pCxt->pTableNameHashObj); taosHashCleanup(pCxt->pDbFNameHashObj); - destroyBlockHashmap(pCxt->pTableBlockHashObj); - destroyBlockArrayList(pCxt->pVgDataBlocks); + insDestroyBlockHashmap(pCxt->pTableBlockHashObj); + insDestroyBlockArrayList(pCxt->pVgDataBlocks); } static int32_t parseTableName(SInsertParseContext* pCxt, SToken* pTbnameToken, SName* pName, char* pDbFName, char* pTbFName) { - int32_t code = createSName(pName, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); + int32_t code = insCreateSName(pName, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); if (TSDB_CODE_SUCCESS == code) { tNameExtractFullName(pName, pTbFName); code = taosHashPut(pCxt->pTableNameHashObj, pTbFName, strlen(pTbFName), pName, sizeof(SName)); @@ -1556,14 +1301,14 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { STableDataBlocks* dataBuf = NULL; if (pCxt->pComCxt->async) { - CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, sizeof(pCxt->pTableMeta->uid), - TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), - getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, - &pCxt->createTblReq)); + CHECK_CODE(insGetDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, + sizeof(pCxt->pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), + getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, + &pCxt->createTblReq)); } else { - CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, - &dataBuf, NULL, &pCxt->createTblReq)); + CHECK_CODE(insGetDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE, + sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, + &dataBuf, NULL, &pCxt->createTblReq)); } if (NULL != pBoundColsStart) { @@ -1624,17 +1369,18 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { // merge according to vgId if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { - CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); + CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); } - return buildOutput(pCxt); + return insBuildOutput(pCxt); } static int32_t parseInsertBodyAgain(SInsertParseContext* pCxt) { STableDataBlocks* dataBuf = NULL; CHECK_CODE(getTableMeta(pCxt, pCxt->pComCxt->csvCxt.tableNo, &pCxt->pComCxt->csvCxt.tableName)); - CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, sizeof(pCxt->pTableMeta->uid), - TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, - pCxt->pTableMeta, &dataBuf, NULL, &pCxt->createTblReq)); + CHECK_CODE(insGetDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, sizeof(pCxt->pTableMeta->uid), + TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), + getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, + &pCxt->createTblReq)); CHECK_CODE(parseDataFromFileAgain(pCxt, pCxt->pComCxt->csvCxt.tableNo, &pCxt->pComCxt->csvCxt.tableName, dataBuf)); if (taosEOFFile(pCxt->pComCxt->csvCxt.fp)) { CHECK_CODE(parseInsertBody(pCxt)); @@ -1644,9 +1390,9 @@ static int32_t parseInsertBodyAgain(SInsertParseContext* pCxt) { parserDebug("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum); // merge according to vgId if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { - CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); + CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); } - return buildOutput(pCxt); + return insBuildOutput(pCxt); } // INSERT INTO @@ -1822,7 +1568,7 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) { static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, bool isStable, int32_t tableNo, SToken* pTbToken) { SName name = {0}; - CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); + CHECK_CODE(insCreateSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); CHECK_CODE(reserveTableMetaInCacheForInsert(&name, isStable ? CATALOG_REQ_TYPE_META : CATALOG_REQ_TYPE_BOTH, tableNo, pCxt->pMetaCache)); return TSDB_CODE_SUCCESS; @@ -1837,7 +1583,7 @@ static int32_t checkTableName(const char* pTableName, SMsgBuf* pMsgBuf) { static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, int32_t tableNo, SToken* pTbToken) { SName name = {0}; - CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); + CHECK_CODE(insCreateSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); CHECK_CODE(checkTableName(name.tname, &pCxt->msg)); CHECK_CODE(reserveTableMetaInCacheForInsert(&name, CATALOG_REQ_TYPE_VGROUP, tableNo, pCxt->pMetaCache)); return TSDB_CODE_SUCCESS; @@ -1941,704 +1687,3 @@ int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery, SParseMetaCa } return code; } - -int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, - int32_t msgBufLen) { - SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen}; - SToken sToken; - int32_t code = 0; - char* tbName = NULL; - - NEXT_TOKEN(pTableName, sToken); - - if (sToken.n == 0) { - return buildInvalidOperationMsg(&msg, "empty table name"); - } - - code = createSName(pName, &sToken, acctId, dbName, &msg); - if (code) { - return code; - } - - NEXT_TOKEN(pTableName, sToken); - - if (sToken.n > 0) { - return buildInvalidOperationMsg(&msg, "table name format is wrong"); - } - - return TSDB_CODE_SUCCESS; -} - -int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { - SVnodeModifOpStmt* modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot; - int32_t code = 0; - SInsertParseContext insertCtx = { - .pVgroupsHashObj = pVgHash, - .pTableBlockHashObj = pBlockHash, - .pOutput = (SVnodeModifOpStmt*)pQuery->pRoot, - }; - - // merge according to vgId - if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) { - CHECK_CODE(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks)); - } - - CHECK_CODE(buildOutput(&insertCtx)); - - destroyBlockArrayList(insertCtx.pVgDataBlocks); - return TSDB_CODE_SUCCESS; -} - -int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, - TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; - SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; - if (NULL == tags) { - return TSDB_CODE_QRY_APP_ERROR; - } - - SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal)); - if (!pTagArray) { - return buildInvalidOperationMsg(&pBuf, "out of memory"); - } - - SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN); - if (!tagName) { - return buildInvalidOperationMsg(&pBuf, "out of memory"); - } - - int32_t code = TSDB_CODE_SUCCESS; - SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); - - bool isJson = false; - STag* pTag = NULL; - - for (int c = 0; c < tags->numOfBound; ++c) { - if (bind[c].is_null && bind[c].is_null[0]) { - continue; - } - - SSchema* pTagSchema = &pSchema[tags->boundColumns[c]]; - int32_t colLen = pTagSchema->bytes; - if (IS_VAR_DATA_TYPE(pTagSchema->type)) { - colLen = bind[c].length[0]; - } - taosArrayPush(tagName, pTagSchema->name); - if (pTagSchema->type == TSDB_DATA_TYPE_JSON) { - if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { - code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer); - goto end; - } - - isJson = true; - char* tmp = taosMemoryCalloc(1, colLen + 1); - memcpy(tmp, bind[c].buffer, colLen); - code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf); - taosMemoryFree(tmp); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - } else { - STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; - // strcpy(val.colName, pTagSchema->name); - if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) { - val.pData = (uint8_t*)bind[c].buffer; - val.nData = colLen; - } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) { - int32_t output = 0; - void* p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE); - if (p == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) { - if (errno == E2BIG) { - taosMemoryFree(p); - code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name); - goto end; - } - char buf[512] = {0}; - snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno)); - taosMemoryFree(p); - code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer); - goto end; - } - val.pData = p; - val.nData = output; - } else { - memcpy(&val.i64, bind[c].buffer, colLen); - } - taosArrayPush(pTagArray, &val); - } - } - - if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) { - goto end; - } - - SVCreateTbReq tbReq = {0}; - buildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags); - code = buildCreateTbMsg(pDataBlock, &tbReq); - tdDestroySVCreateTbReq(&tbReq); - -end: - for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { - STagVal* p = (STagVal*)taosArrayGet(pTagArray, i); - if (p->type == TSDB_DATA_TYPE_NCHAR) { - taosMemoryFreeClear(p->pData); - } - } - taosArrayDestroy(pTagArray); - taosArrayDestroy(tagName); - - return code; -} - -int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; - SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); - int32_t extendedRowSize = getExtendedRowSize(pDataBlock); - SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; - SRowBuilder* pBuilder = &pDataBlock->rowBuilder; - SMemParam param = {.rb = pBuilder}; - SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - int32_t rowNum = bind->num; - - CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); - - CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num)); - - for (int32_t r = 0; r < bind->num; ++r) { - STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header - tdSRowResetBuf(pBuilder, row); - - for (int c = 0; c < spd->numOfBound; ++c) { - SSchema* pColSchema = &pSchema[spd->boundColumns[c]]; - - if (bind[c].num != rowNum) { - return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); - } - - param.schema = pColSchema; - getSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); - - if (bind[c].is_null && bind[c].is_null[r]) { - if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL"); - } - - CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, ¶m)); - } else { - if (bind[c].buffer_type != pColSchema->type) { - return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); - } - - int32_t colLen = pColSchema->bytes; - if (IS_VAR_DATA_TYPE(pColSchema->type)) { - colLen = bind[c].length[r]; - } - - CHECK_CODE(MemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, ¶m)); - } - - if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { - TSKEY tsKey = TD_ROW_KEY(row); - checkTimestamp(pDataBlock, (const char*)&tsKey); - } - } - // set the null value for the columns that do not assign values - if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { - pBuilder->hasNone = true; - } - tdSRowEnd(pBuilder); -#ifdef TD_DEBUG_PRINT_ROW - STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1); - tdSRowPrint(row, pSTSchema, __func__); - taosMemoryFree(pSTSchema); -#endif - pDataBlock->size += extendedRowSize; - } - - SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); - if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) { - return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX"); - } - - return TSDB_CODE_SUCCESS; -} - -int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, - int32_t rowNum) { - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; - SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); - int32_t extendedRowSize = getExtendedRowSize(pDataBlock); - SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; - SRowBuilder* pBuilder = &pDataBlock->rowBuilder; - SMemParam param = {.rb = pBuilder}; - SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - bool rowStart = (0 == colIdx); - bool rowEnd = ((colIdx + 1) == spd->numOfBound); - - if (rowStart) { - CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); - CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num)); - } - - for (int32_t r = 0; r < bind->num; ++r) { - STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r); // skip the SSubmitBlk header - if (rowStart) { - tdSRowResetBuf(pBuilder, row); - } else { - tdSRowGetBuf(pBuilder, row); - } - - SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx]]; - - if (bind->num != rowNum) { - return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); - } - - param.schema = pColSchema; - getSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, ¶m.toffset, ¶m.colIdx); - - if (bind->is_null && bind->is_null[r]) { - if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL"); - } - - CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, ¶m)); - } else { - if (bind->buffer_type != pColSchema->type) { - return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); - } - - int32_t colLen = pColSchema->bytes; - if (IS_VAR_DATA_TYPE(pColSchema->type)) { - colLen = bind->length[r]; - } - - CHECK_CODE(MemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, ¶m)); - } - - if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { - TSKEY tsKey = TD_ROW_KEY(row); - checkTimestamp(pDataBlock, (const char*)&tsKey); - } - - // set the null value for the columns that do not assign values - if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { - pBuilder->hasNone = true; - } - if (rowEnd) { - tdSRowEnd(pBuilder); - } -#ifdef TD_DEBUG_PRINT_ROW - if (rowEnd) { - STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1); - tdSRowPrint(row, pSTSchema, __func__); - taosMemoryFree(pSTSchema); - } -#endif - } - - if (rowEnd) { - pDataBlock->size += extendedRowSize * bind->num; - - SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); - if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) { - return buildInvalidOperationMsg(&pBuf, - "too many rows in sql, total number of rows should be less than INT32_MAX"); - } - } - - return TSDB_CODE_SUCCESS; -} - -int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields, - uint8_t timePrec) { - if (fields) { - *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD)); - if (NULL == *fields) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - SSchema* schema = &pSchema[boundInfo->boundColumns[0]]; - if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) { - (*fields)[0].precision = timePrec; - } - - for (int32_t i = 0; i < boundInfo->numOfBound; ++i) { - schema = &pSchema[boundInfo->boundColumns[i]]; - strcpy((*fields)[i].name, schema->name); - (*fields)[i].type = schema->type; - (*fields)[i].bytes = schema->bytes; - } - } - - *fieldNum = boundInfo->numOfBound; - - return TSDB_CODE_SUCCESS; -} - -int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) { - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; - SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; - if (NULL == tags) { - return TSDB_CODE_QRY_APP_ERROR; - } - - if (pDataBlock->pTableMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pTableMeta->tableType != TSDB_CHILD_TABLE) { - return TSDB_CODE_TSC_STMT_API_ERROR; - } - - SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); - if (tags->numOfBound <= 0) { - *fieldNum = 0; - *fields = NULL; - - return TSDB_CODE_SUCCESS; - } - - CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields, 0)); - - return TSDB_CODE_SUCCESS; -} - -int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) { - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; - SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); - if (pDataBlock->boundColumnInfo.numOfBound <= 0) { - *fieldNum = 0; - if (fields) { - *fields = NULL; - } - - return TSDB_CODE_SUCCESS; - } - - CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields, - pDataBlock->pTableMeta->tableInfo.precision)); - - return TSDB_CODE_SUCCESS; -} - -// schemaless logic start - -typedef struct SmlExecTableHandle { - SParsedDataColInfo tags; // each table - SVCreateTbReq createTblReq; // each table -} SmlExecTableHandle; - -typedef struct SmlExecHandle { - SHashObj* pBlockHash; - SmlExecTableHandle tableExecHandle; - SQuery* pQuery; -} SSmlExecHandle; - -static void smlDestroyTableHandle(void* pHandle) { - SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle; - destroyBoundColumnInfo(&handle->tags); - tdDestroySVCreateTbReq(&handle->createTblReq); -} - -static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema, bool isTag) { - col_id_t nCols = pColList->numOfCols; - - pColList->numOfBound = 0; - pColList->boundNullLen = 0; - memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols); - for (col_id_t i = 0; i < nCols; ++i) { - pColList->cols[i].valStat = VAL_STAT_NONE; - } - - bool isOrdered = true; - col_id_t lastColIdx = -1; // last column found - for (int i = 0; i < taosArrayGetSize(cols); ++i) { - SSmlKv* kv = taosArrayGetP(cols, i); - SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key}; - col_id_t t = lastColIdx + 1; - col_id_t index = ((t == 0 && !isTag) ? 0 : findCol(&sToken, t, nCols, pSchema)); - uDebug("SML, index:%d, t:%d, ncols:%d", index, t, nCols); - if (index < 0 && t > 0) { - index = findCol(&sToken, 0, t, pSchema); - isOrdered = false; - } - if (index < 0) { - uError("smlBoundColumnData. index:%d", index); - return TSDB_CODE_SML_INVALID_DATA; - } - if (pColList->cols[index].valStat == VAL_STAT_HAS) { - uError("smlBoundColumnData. already set. index:%d", index); - return TSDB_CODE_SML_INVALID_DATA; - } - lastColIdx = index; - pColList->cols[index].valStat = VAL_STAT_HAS; - pColList->boundColumns[pColList->numOfBound] = index; - ++pColList->numOfBound; - switch (pSchema[t].type) { - case TSDB_DATA_TYPE_BINARY: - pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES); - break; - case TSDB_DATA_TYPE_NCHAR: - pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE); - break; - default: - pColList->boundNullLen += TYPE_BYTES[pSchema[t].type]; - break; - } - } - - pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED; - - if (!isOrdered) { - pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo)); - if (NULL == pColList->colIdxInfo) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - SBoundIdxInfo* pColIdx = pColList->colIdxInfo; - for (col_id_t i = 0; i < pColList->numOfBound; ++i) { - pColIdx[i].schemaColIdx = pColList->boundColumns[i]; - pColIdx[i].boundIdx = i; - } - taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar); - for (col_id_t i = 0; i < pColList->numOfBound; ++i) { - pColIdx[i].finalIdx = i; - } - taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar); - } - - if (pColList->numOfCols > pColList->numOfBound) { - memset(&pColList->boundColumns[pColList->numOfBound], 0, - sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound)); - } - - return TSDB_CODE_SUCCESS; -} - -/** - * @brief No json tag for schemaless - * - * @param cols - * @param tags - * @param pSchema - * @param ppTag - * @param msg - * @return int32_t - */ -static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName, - SMsgBuf* msg) { - SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal)); - if (!pTagArray) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - *tagName = taosArrayInit(8, TSDB_COL_NAME_LEN); - if (!*tagName) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - int32_t code = TSDB_CODE_SUCCESS; - for (int i = 0; i < tags->numOfBound; ++i) { - SSchema* pTagSchema = &pSchema[tags->boundColumns[i]]; - SSmlKv* kv = taosArrayGetP(cols, i); - - taosArrayPush(*tagName, pTagSchema->name); - STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; - // strcpy(val.colName, pTagSchema->name); - if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) { - val.pData = (uint8_t*)kv->value; - val.nData = kv->length; - } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) { - int32_t output = 0; - void* p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE); - if (p == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), kv->length * TSDB_NCHAR_SIZE, &output)) { - if (errno == E2BIG) { - taosMemoryFree(p); - code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name); - goto end; - } - char buf[512] = {0}; - snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno)); - taosMemoryFree(p); - code = buildSyntaxErrMsg(msg, buf, kv->value); - goto end; - } - val.pData = p; - val.nData = output; - } else { - memcpy(&val.i64, &(kv->value), kv->length); - } - taosArrayPush(pTagArray, &val); - } - - code = tTagNew(pTagArray, 1, false, ppTag); -end: - for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { - STagVal* p = (STagVal*)taosArrayGet(pTagArray, i); - if (p->type == TSDB_DATA_TYPE_NCHAR) { - taosMemoryFree(p->pData); - } - } - taosArrayDestroy(pTagArray); - return code; -} - -int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, - char* tableName, const char* sTableName, int32_t sTableNameLen, char* msgBuf, int16_t msgBufLen) { - SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - - SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; - smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table - SSchema* pTagsSchema = getTableTagSchema(pTableMeta); - setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta)); - int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, true); - if (ret != TSDB_CODE_SUCCESS) { - buildInvalidOperationMsg(&pBuf, "bound tags error"); - return ret; - } - STag* pTag = NULL; - SArray* tagName = NULL; - ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, &pTag, &tagName, &pBuf); - if (ret != TSDB_CODE_SUCCESS) { - taosArrayDestroy(tagName); - return ret; - } - - buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, - pTableMeta->tableInfo.numOfTags); - taosArrayDestroy(tagName); - - smlHandle->tableExecHandle.createTblReq.ctb.stbName = taosMemoryMalloc(sTableNameLen + 1); - memcpy(smlHandle->tableExecHandle.createTblReq.ctb.stbName, sTableName, sTableNameLen); - smlHandle->tableExecHandle.createTblReq.ctb.stbName[sTableNameLen] = 0; - - STableDataBlocks* pDataBlock = NULL; - ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), - TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, - pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq); - if (ret != TSDB_CODE_SUCCESS) { - buildInvalidOperationMsg(&pBuf, "create data block error"); - return ret; - } - - SSchema* pSchema = getTableColumnSchema(pTableMeta); - - ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema, false); - if (ret != TSDB_CODE_SUCCESS) { - buildInvalidOperationMsg(&pBuf, "bound cols error"); - return ret; - } - int32_t extendedRowSize = getExtendedRowSize(pDataBlock); - SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; - SRowBuilder* pBuilder = &pDataBlock->rowBuilder; - SMemParam param = {.rb = pBuilder}; - - initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo); - - int32_t rowNum = taosArrayGetSize(cols); - if (rowNum <= 0) { - return buildInvalidOperationMsg(&pBuf, "cols size <= 0"); - } - ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum); - if (ret != TSDB_CODE_SUCCESS) { - buildInvalidOperationMsg(&pBuf, "allocate memory error"); - return ret; - } - for (int32_t r = 0; r < rowNum; ++r) { - STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header - tdSRowResetBuf(pBuilder, row); - void* rowData = taosArrayGetP(cols, r); - size_t rowDataSize = 0; - if (format) { - rowDataSize = taosArrayGetSize(rowData); - } - - // 1. set the parsed value from sql string - for (int c = 0, j = 0; c < spd->numOfBound; ++c) { - SSchema* pColSchema = &pSchema[spd->boundColumns[c]]; - - param.schema = pColSchema; - getSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); - - SSmlKv* kv = NULL; - if (format) { - if (j < rowDataSize) { - kv = taosArrayGetP(rowData, j); - if (rowDataSize != spd->numOfBound && j != 0 && - (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) { - kv = NULL; - } else { - j++; - } - } - } else { - void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name)); - if (p) kv = *p; - } - - if (kv) { - int32_t colLen = kv->length; - if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { - // uError("SML:data before:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision); - kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); - // uError("SML:data after:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision); - } - - if (IS_VAR_DATA_TYPE(kv->type)) { - MemRowAppend(&pBuf, kv->value, colLen, ¶m); - } else { - MemRowAppend(&pBuf, &(kv->value), colLen, ¶m); - } - } else { - pBuilder->hasNone = true; - } - - if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { - TSKEY tsKey = TD_ROW_KEY(row); - checkTimestamp(pDataBlock, (const char*)&tsKey); - } - } - - // set the null value for the columns that do not assign values - if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { - pBuilder->hasNone = true; - } - - tdSRowEnd(pBuilder); - pDataBlock->size += extendedRowSize; - } - - SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); - if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) { - return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX"); - } - - return TSDB_CODE_SUCCESS; -} - -void* smlInitHandle(SQuery* pQuery) { - SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle)); - if (!handle) return NULL; - handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - handle->pQuery = pQuery; - - return handle; -} - -void smlDestroyHandle(void* pHandle) { - if (!pHandle) return; - SSmlExecHandle* handle = (SSmlExecHandle*)pHandle; - destroyBlockHashmap(handle->pBlockHash); - smlDestroyTableHandle(&handle->tableExecHandle); - taosMemoryFree(handle); -} - -int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) { - SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; - return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash); -} -// schemaless logic end diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c new file mode 100644 index 0000000000000000000000000000000000000000..fda3e0866d8d97863e13fc579c2319d52d078221 --- /dev/null +++ b/source/libs/parser/src/parInsertStmt.c @@ -0,0 +1,486 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "parInsertUtil.h" +#include "parInt.h" +#include "parToken.h" +#include "query.h" +#include "tglobal.h" +#include "ttime.h" +#include "ttypes.h" + +typedef struct SKvParam { + int16_t pos; + SArray* pTagVals; + SSchema* schema; + char buf[TSDB_MAX_TAGS_LEN]; +} SKvParam; + +int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { + SVnodeModifOpStmt* modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot; + int32_t code = 0; + SInsertParseContext insertCtx = { + .pVgroupsHashObj = pVgHash, + .pTableBlockHashObj = pBlockHash, + .pOutput = (SVnodeModifOpStmt*)pQuery->pRoot, + }; + + // merge according to vgId + if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) { + CHECK_CODE( + insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks)); + } + + CHECK_CODE(insBuildOutput(&insertCtx)); + + insDestroyBlockArrayList(insertCtx.pVgDataBlocks); + return TSDB_CODE_SUCCESS; +} + +int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, + TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { + STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; + if (NULL == tags) { + return TSDB_CODE_QRY_APP_ERROR; + } + + SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal)); + if (!pTagArray) { + return buildInvalidOperationMsg(&pBuf, "out of memory"); + } + + SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN); + if (!tagName) { + return buildInvalidOperationMsg(&pBuf, "out of memory"); + } + + int32_t code = TSDB_CODE_SUCCESS; + SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); + + bool isJson = false; + STag* pTag = NULL; + + for (int c = 0; c < tags->numOfBound; ++c) { + if (bind[c].is_null && bind[c].is_null[0]) { + continue; + } + + SSchema* pTagSchema = &pSchema[tags->boundColumns[c]]; + int32_t colLen = pTagSchema->bytes; + if (IS_VAR_DATA_TYPE(pTagSchema->type)) { + colLen = bind[c].length[0]; + } + taosArrayPush(tagName, pTagSchema->name); + if (pTagSchema->type == TSDB_DATA_TYPE_JSON) { + if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { + code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer); + goto end; + } + + isJson = true; + char* tmp = taosMemoryCalloc(1, colLen + 1); + memcpy(tmp, bind[c].buffer, colLen); + code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf); + taosMemoryFree(tmp); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + } else { + STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; + // strcpy(val.colName, pTagSchema->name); + if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) { + val.pData = (uint8_t*)bind[c].buffer; + val.nData = colLen; + } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) { + int32_t output = 0; + void* p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE); + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) { + if (errno == E2BIG) { + taosMemoryFree(p); + code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name); + goto end; + } + char buf[512] = {0}; + snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno)); + taosMemoryFree(p); + code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer); + goto end; + } + val.pData = p; + val.nData = output; + } else { + memcpy(&val.i64, bind[c].buffer, colLen); + } + taosArrayPush(pTagArray, &val); + } + } + + if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) { + goto end; + } + + SVCreateTbReq tbReq = {0}; + insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags); + code = insBuildCreateTbMsg(pDataBlock, &tbReq); + tdDestroySVCreateTbReq(&tbReq); + +end: + for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { + STagVal* p = (STagVal*)taosArrayGet(pTagArray, i); + if (p->type == TSDB_DATA_TYPE_NCHAR) { + taosMemoryFreeClear(p->pData); + } + } + taosArrayDestroy(pTagArray); + taosArrayDestroy(tagName); + + return code; +} + +int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { + STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; + SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); + int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock); + SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; + SRowBuilder* pBuilder = &pDataBlock->rowBuilder; + SMemParam param = {.rb = pBuilder}; + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + int32_t rowNum = bind->num; + + CHECK_CODE( + insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); + + CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num)); + + for (int32_t r = 0; r < bind->num; ++r) { + STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header + tdSRowResetBuf(pBuilder, row); + + for (int c = 0; c < spd->numOfBound; ++c) { + SSchema* pColSchema = &pSchema[spd->boundColumns[c]]; + + if (bind[c].num != rowNum) { + return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); + } + + param.schema = pColSchema; + insGetSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); + + if (bind[c].is_null && bind[c].is_null[r]) { + if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL"); + } + + CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, ¶m)); + } else { + if (bind[c].buffer_type != pColSchema->type) { + return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); + } + + int32_t colLen = pColSchema->bytes; + if (IS_VAR_DATA_TYPE(pColSchema->type)) { + colLen = bind[c].length[r]; + } + + CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, ¶m)); + } + + if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { + TSKEY tsKey = TD_ROW_KEY(row); + insCheckTimestamp(pDataBlock, (const char*)&tsKey); + } + } + // set the null value for the columns that do not assign values + if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { + pBuilder->hasNone = true; + } + tdSRowEnd(pBuilder); +#ifdef TD_DEBUG_PRINT_ROW + STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1); + tdSRowPrint(row, pSTSchema, __func__); + taosMemoryFree(pSTSchema); +#endif + pDataBlock->size += extendedRowSize; + } + + SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); + if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, bind->num)) { + return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX"); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, + int32_t rowNum) { + STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; + SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); + int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock); + SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; + SRowBuilder* pBuilder = &pDataBlock->rowBuilder; + SMemParam param = {.rb = pBuilder}; + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + bool rowStart = (0 == colIdx); + bool rowEnd = ((colIdx + 1) == spd->numOfBound); + + if (rowStart) { + CHECK_CODE( + insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); + CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num)); + } + + for (int32_t r = 0; r < bind->num; ++r) { + STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r); // skip the SSubmitBlk header + if (rowStart) { + tdSRowResetBuf(pBuilder, row); + } else { + tdSRowGetBuf(pBuilder, row); + } + + SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx]]; + + if (bind->num != rowNum) { + return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); + } + + param.schema = pColSchema; + insGetSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, ¶m.toffset, ¶m.colIdx); + + if (bind->is_null && bind->is_null[r]) { + if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL"); + } + + CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, ¶m)); + } else { + if (bind->buffer_type != pColSchema->type) { + return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); + } + + int32_t colLen = pColSchema->bytes; + if (IS_VAR_DATA_TYPE(pColSchema->type)) { + colLen = bind->length[r]; + } + + CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, ¶m)); + } + + if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { + TSKEY tsKey = TD_ROW_KEY(row); + insCheckTimestamp(pDataBlock, (const char*)&tsKey); + } + + // set the null value for the columns that do not assign values + if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { + pBuilder->hasNone = true; + } + if (rowEnd) { + tdSRowEnd(pBuilder); + } +#ifdef TD_DEBUG_PRINT_ROW + if (rowEnd) { + STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1); + tdSRowPrint(row, pSTSchema, __func__); + taosMemoryFree(pSTSchema); + } +#endif + } + + if (rowEnd) { + pDataBlock->size += extendedRowSize * bind->num; + + SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); + if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, bind->num)) { + return buildInvalidOperationMsg(&pBuf, + "too many rows in sql, total number of rows should be less than INT32_MAX"); + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields, + uint8_t timePrec) { + if (fields) { + *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD)); + if (NULL == *fields) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SSchema* schema = &pSchema[boundInfo->boundColumns[0]]; + if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) { + (*fields)[0].precision = timePrec; + } + + for (int32_t i = 0; i < boundInfo->numOfBound; ++i) { + schema = &pSchema[boundInfo->boundColumns[i]]; + strcpy((*fields)[i].name, schema->name); + (*fields)[i].type = schema->type; + (*fields)[i].bytes = schema->bytes; + } + } + + *fieldNum = boundInfo->numOfBound; + + return TSDB_CODE_SUCCESS; +} + +int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) { + STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; + SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; + if (NULL == tags) { + return TSDB_CODE_QRY_APP_ERROR; + } + + if (pDataBlock->pTableMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pTableMeta->tableType != TSDB_CHILD_TABLE) { + return TSDB_CODE_TSC_STMT_API_ERROR; + } + + SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); + if (tags->numOfBound <= 0) { + *fieldNum = 0; + *fields = NULL; + + return TSDB_CODE_SUCCESS; + } + + CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields, 0)); + + return TSDB_CODE_SUCCESS; +} + +int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) { + STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; + SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); + if (pDataBlock->boundColumnInfo.numOfBound <= 0) { + *fieldNum = 0; + if (fields) { + *fields = NULL; + } + + return TSDB_CODE_SUCCESS; + } + + CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields, + pDataBlock->pTableMeta->tableInfo.precision)); + + return TSDB_CODE_SUCCESS; +} + +int32_t qResetStmtDataBlock(void* block, bool keepBuf) { + STableDataBlocks* pBlock = (STableDataBlocks*)block; + + if (keepBuf) { + taosMemoryFreeClear(pBlock->pData); + pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE); + if (NULL == pBlock->pData) { + return TSDB_CODE_OUT_OF_MEMORY; + } + memset(pBlock->pData, 0, sizeof(SSubmitBlk)); + } else { + pBlock->pData = NULL; + } + + pBlock->ordered = true; + pBlock->prevTS = INT64_MIN; + pBlock->size = sizeof(SSubmitBlk); + pBlock->tsSource = -1; + pBlock->numOfTables = 1; + pBlock->nAllocSize = TSDB_PAYLOAD_SIZE; + pBlock->headerSize = pBlock->size; + pBlock->createTbReqLen = 0; + + memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder)); + + return TSDB_CODE_SUCCESS; +} + +int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) { + *pDst = taosMemoryMalloc(sizeof(STableDataBlocks)); + if (NULL == *pDst) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + memcpy(*pDst, pSrc, sizeof(STableDataBlocks)); + ((STableDataBlocks*)(*pDst))->cloned = true; + + STableDataBlocks* pBlock = (STableDataBlocks*)(*pDst); + if (pBlock->pTableMeta) { + void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pBlock->pTableMeta)); + if (NULL == pNewMeta) { + taosMemoryFreeClear(*pDst); + return TSDB_CODE_OUT_OF_MEMORY; + } + memcpy(pNewMeta, pBlock->pTableMeta, TABLE_META_SIZE(pBlock->pTableMeta)); + pBlock->pTableMeta = pNewMeta; + } + + return qResetStmtDataBlock(*pDst, false); +} + +int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) { + int32_t code = qCloneStmtDataBlock(pDst, pSrc); + if (code) { + return code; + } + + STableDataBlocks* pBlock = (STableDataBlocks*)*pDst; + pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize); + if (NULL == pBlock->pData) { + qFreeStmtDataBlock(pBlock); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pBlock->vgId = vgId; + + if (pBlock->pTableMeta) { + pBlock->pTableMeta->uid = uid; + pBlock->pTableMeta->vgId = vgId; + } + + memset(pBlock->pData, 0, sizeof(SSubmitBlk)); + + return TSDB_CODE_SUCCESS; +} + +STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataBlocks*)pDataBlock)->pTableMeta; } + +void qFreeStmtDataBlock(void* pDataBlock) { + if (pDataBlock == NULL) { + return; + } + + taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta); + taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData); + taosMemoryFreeClear(pDataBlock); +} + +void qDestroyStmtDataBlock(void* pBlock) { + if (pBlock == NULL) { + return; + } + + STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; + + pDataBlock->cloned = false; + insDestroyDataBlock(pDataBlock); +} diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertUtil.c similarity index 69% rename from source/libs/parser/src/parInsertData.c rename to source/libs/parser/src/parInsertUtil.c index 954c1b332a782fd212902753b2c2505ad29b50cf..e921cfe997d8225e74e0337b98208db09b09cc71 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "parInsertData.h" +#include "parInsertUtil.h" #include "catalog.h" #include "parInt.h" @@ -81,7 +81,53 @@ static int32_t rowDataComparStable(const void* lhs, const void* rhs) { } } -void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) { +int32_t insGetExtendedRowSize(STableDataBlocks* pBlock) { + STableComInfo* pTableInfo = &pBlock->pTableMeta->tableInfo; + ASSERT(pBlock->rowSize == pTableInfo->rowSize); + return pBlock->rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + pBlock->boundColumnInfo.extendedVarLen + + (int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1); +} + +void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo* spd, col_id_t idx, int32_t* toffset, + col_id_t* colIdx) { + col_id_t schemaIdx = 0; + if (IS_DATA_COL_ORDERED(spd)) { + schemaIdx = spd->boundColumns[idx]; + if (TD_IS_TP_ROW_T(rowType)) { + *toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart + *colIdx = schemaIdx; + } else { + *toffset = idx * sizeof(SKvRowIdx); // the offset of SKvRowIdx + *colIdx = idx; + } + } else { + ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx); + schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx; + if (TD_IS_TP_ROW_T(rowType)) { + *toffset = (spd->cols + schemaIdx)->toffset; + *colIdx = schemaIdx; + } else { + *toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SKvRowIdx); + *colIdx = (spd->colIdxInfo + idx)->finalIdx; + } + } +} + +int32_t insSetBlockInfo(SSubmitBlk* pBlocks, STableDataBlocks* dataBuf, int32_t numOfRows) { + pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? 0 : dataBuf->pTableMeta->suid); + pBlocks->uid = dataBuf->pTableMeta->uid; + pBlocks->sversion = dataBuf->pTableMeta->sversion; + pBlocks->schemaLen = dataBuf->createTbReqLen; + + if (pBlocks->numOfRows + numOfRows >= INT32_MAX) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } else { + pBlocks->numOfRows += numOfRows; + return TSDB_CODE_SUCCESS; + } +} + +void insSetBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) { pColList->numOfCols = numOfCols; pColList->numOfBound = numOfCols; pColList->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode @@ -118,7 +164,7 @@ void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT)); } -int32_t schemaIdxCompar(const void* lhs, const void* rhs) { +int32_t insSchemaIdxCompar(const void* lhs, const void* rhs) { uint16_t left = *(uint16_t*)lhs; uint16_t right = *(uint16_t*)rhs; @@ -129,7 +175,7 @@ int32_t schemaIdxCompar(const void* lhs, const void* rhs) { } } -int32_t boundIdxCompar(const void* lhs, const void* rhs) { +int32_t insBoundIdxCompar(const void* lhs, const void* rhs) { uint16_t left = *(uint16_t*)POINTER_SHIFT(lhs, sizeof(uint16_t)); uint16_t right = *(uint16_t*)POINTER_SHIFT(rhs, sizeof(uint16_t)); @@ -178,7 +224,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo; SSchema* pSchema = getTableColumnSchema(dataBuf->pTableMeta); - setBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns); + insSetBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns); dataBuf->ordered = true; dataBuf->prevTS = INT64_MIN; @@ -192,7 +238,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star return TSDB_CODE_SUCCESS; } -int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) { +int32_t insBuildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) { SEncoder coder = {0}; char* pBuf; int32_t len; @@ -222,7 +268,7 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) return code; } -static void destroyDataBlock(STableDataBlocks* pDataBlock) { +void insDestroyDataBlock(STableDataBlocks* pDataBlock) { if (pDataBlock == NULL) { return; } @@ -237,9 +283,9 @@ static void destroyDataBlock(STableDataBlocks* pDataBlock) { taosMemoryFreeClear(pDataBlock); } -int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset, - int32_t rowSize, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, - SVCreateTbReq* pCreateTbReq) { +int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset, + int32_t rowSize, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, + SArray* pBlockList, SVCreateTbReq* pCreateTbReq) { *dataBlocks = NULL; STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)id, idLen); if (t1 != NULL) { @@ -253,9 +299,9 @@ int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32 } if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctb.pTag) { - ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq); + ret = insBuildCreateTbMsg(*dataBlocks, pCreateTbReq); if (ret != TSDB_CODE_SUCCESS) { - destroyDataBlock(*dataBlocks); + insDestroyDataBlock(*dataBlocks); return ret; } } @@ -283,7 +329,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { return result; } -void destroyBlockArrayList(SArray* pDataBlockList) { +void insDestroyBlockArrayList(SArray* pDataBlockList) { if (pDataBlockList == NULL) { return; } @@ -291,13 +337,13 @@ void destroyBlockArrayList(SArray* pDataBlockList) { size_t size = taosArrayGetSize(pDataBlockList); for (int32_t i = 0; i < size; i++) { void* p = taosArrayGetP(pDataBlockList, i); - destroyDataBlock(p); + insDestroyDataBlock(p); } taosArrayDestroy(pDataBlockList); } -void destroyBlockHashmap(SHashObj* pDataBlockHash) { +void insDestroyBlockHashmap(SHashObj* pDataBlockHash) { if (pDataBlockHash == NULL) { return; } @@ -305,7 +351,7 @@ void destroyBlockHashmap(SHashObj* pDataBlockHash) { void** p1 = taosHashIterate(pDataBlockHash, NULL); while (p1) { STableDataBlocks* pBlocks = *p1; - destroyDataBlock(pBlocks); + insDestroyDataBlock(pBlocks); p1 = taosHashIterate(pDataBlockHash, p1); } @@ -375,7 +421,7 @@ static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* } memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc); - int32_t extendedRowSize = getExtendedRowSize(dataBuf); + int32_t extendedRowSize = insGetExtendedRowSize(dataBuf); SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; char* pBlockData = pBlocks->data + pBlocks->schemaLen; int n = 0; @@ -545,7 +591,7 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p tdResetSBlockRowMerger(*ppBlkRowMerger); - int32_t extendedRowSize = getExtendedRowSize(dataBuf); + int32_t extendedRowSize = insGetExtendedRowSize(dataBuf); SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; char* pBlockData = pBlocks->data + pBlocks->schemaLen; int32_t n = 0; @@ -679,7 +725,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB return pBlock->dataLen + pBlock->schemaLen; } -int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) { +int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) { const int INSERT_HEAD_SIZE = sizeof(SSubmitReq); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(payloadType); @@ -696,13 +742,13 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p if (pBlocks->numOfRows > 0) { STableDataBlocks* dataBuf = NULL; pOneTableBlock->pTableMeta->vgId = pOneTableBlock->vgId; // for schemaless, restore origin vgId - int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId, sizeof(pOneTableBlock->vgId), - TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf, - pVnodeDataBlockList, NULL); + int32_t ret = insGetDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId, + sizeof(pOneTableBlock->vgId), TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0, + pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL); if (ret != TSDB_CODE_SUCCESS) { tdFreeSBlockRowMerger(pBlkRowMerger); taosHashCleanup(pVnodeDataBlockHashList); - destroyBlockArrayList(pVnodeDataBlockList); + insDestroyBlockArrayList(pVnodeDataBlockList); taosMemoryFreeClear(blkKeyInfo.pKeyTuple); return ret; } @@ -721,7 +767,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p } else { // failed to allocate memory, free already allocated memory and return error code tdFreeSBlockRowMerger(pBlkRowMerger); taosHashCleanup(pVnodeDataBlockHashList); - destroyBlockArrayList(pVnodeDataBlockList); + insDestroyBlockArrayList(pVnodeDataBlockList); taosMemoryFreeClear(dataBuf->pData); taosMemoryFreeClear(blkKeyInfo.pKeyTuple); return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -734,7 +780,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) { tdFreeSBlockRowMerger(pBlkRowMerger); taosHashCleanup(pVnodeDataBlockHashList); - destroyBlockArrayList(pVnodeDataBlockList); + insDestroyBlockArrayList(pVnodeDataBlockList); taosMemoryFreeClear(dataBuf->pData); taosMemoryFreeClear(blkKeyInfo.pKeyTuple); return code; @@ -767,7 +813,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p return TSDB_CODE_SUCCESS; } -int32_t allocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) { +int32_t insAllocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) { size_t remain = pDataBlock->nAllocSize - pDataBlock->size; uint32_t nAllocSizeOld = pDataBlock->nAllocSize; @@ -789,137 +835,226 @@ int32_t allocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) { return TSDB_CODE_SUCCESS; } -int32_t allocateMemIfNeed(STableDataBlocks* pDataBlock, int32_t rowSize, int32_t* numOfRows) { - size_t remain = pDataBlock->nAllocSize - pDataBlock->size; - const int factor = 5; - uint32_t nAllocSizeOld = pDataBlock->nAllocSize; +int32_t insInitRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo* pColInfo) { + ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols)); + tdSRowInit(pBuilder, schemaVer); + tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen, + pColInfo->boundNullLen); + return TSDB_CODE_SUCCESS; +} - // expand the allocated size - if (remain < rowSize * factor) { - while (remain < rowSize * factor) { - pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5); - remain = pDataBlock->nAllocSize - pDataBlock->size; +static char* tableNameGetPosition(SToken* pToken, char target) { + bool inEscape = false; + bool inQuote = false; + char quotaStr = 0; + + for (uint32_t i = 0; i < pToken->n; ++i) { + if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) { + return pToken->z + i; } - char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize); - if (tmp != NULL) { - pDataBlock->pData = tmp; - memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size); - } else { - // do nothing, if allocate more memory failed - pDataBlock->nAllocSize = nAllocSizeOld; - *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize; - return TSDB_CODE_TSC_OUT_OF_MEMORY; + if (*(pToken->z + i) == TS_ESCAPE_CHAR) { + if (!inQuote) { + inEscape = !inEscape; + } + } + + if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') { + if (!inEscape) { + if (!inQuote) { + quotaStr = *(pToken->z + i); + inQuote = !inQuote; + } else if (quotaStr == *(pToken->z + i)) { + inQuote = !inQuote; + } + } } } - *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize; - return TSDB_CODE_SUCCESS; + return NULL; } -int initRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo* pColInfo) { - ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols)); - tdSRowInit(pBuilder, schemaVer); - tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen, - pColInfo->boundNullLen); - return TSDB_CODE_SUCCESS; -} +int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) { + const char* msg1 = "name too long"; + const char* msg2 = "invalid database name"; + const char* msg3 = "db is not specified"; + const char* msg4 = "invalid table name"; + + int32_t code = TSDB_CODE_SUCCESS; + char* p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]); -int32_t qResetStmtDataBlock(void* block, bool keepBuf) { - STableDataBlocks* pBlock = (STableDataBlocks*)block; + if (p != NULL) { // db has been specified in sql string so we ignore current db path + assert(*p == TS_PATH_DELIMITER[0]); - if (keepBuf) { - taosMemoryFreeClear(pBlock->pData); - pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE); - if (NULL == pBlock->pData) { - return TSDB_CODE_OUT_OF_MEMORY; + int32_t dbLen = p - pTableName->z; + if (dbLen <= 0) { + return buildInvalidOperationMsg(pMsgBuf, msg2); } - memset(pBlock->pData, 0, sizeof(SSubmitBlk)); - } else { - pBlock->pData = NULL; - } + char name[TSDB_DB_FNAME_LEN] = {0}; + strncpy(name, pTableName->z, dbLen); + int32_t actualDbLen = strdequote(name); - pBlock->ordered = true; - pBlock->prevTS = INT64_MIN; - pBlock->size = sizeof(SSubmitBlk); - pBlock->tsSource = -1; - pBlock->numOfTables = 1; - pBlock->nAllocSize = TSDB_PAYLOAD_SIZE; - pBlock->headerSize = pBlock->size; - pBlock->createTbReqLen = 0; + code = tNameSetDbName(pName, acctId, name, actualDbLen); + if (code != TSDB_CODE_SUCCESS) { + return buildInvalidOperationMsg(pMsgBuf, msg1); + } - memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder)); + int32_t tbLen = pTableName->n - dbLen - 1; + if (tbLen <= 0) { + return buildInvalidOperationMsg(pMsgBuf, msg4); + } - return TSDB_CODE_SUCCESS; -} + char tbname[TSDB_TABLE_FNAME_LEN] = {0}; + strncpy(tbname, p + 1, tbLen); + /*tbLen = */ strdequote(tbname); -int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) { - *pDst = taosMemoryMalloc(sizeof(STableDataBlocks)); - if (NULL == *pDst) { - return TSDB_CODE_OUT_OF_MEMORY; - } + code = tNameFromString(pName, tbname, T_NAME_TABLE); + if (code != 0) { + return buildInvalidOperationMsg(pMsgBuf, msg1); + } + } else { // get current DB name first, and then set it into path + if (pTableName->n >= TSDB_TABLE_NAME_LEN) { + return buildInvalidOperationMsg(pMsgBuf, msg1); + } + + assert(pTableName->n < TSDB_TABLE_FNAME_LEN); + + char name[TSDB_TABLE_FNAME_LEN] = {0}; + strncpy(name, pTableName->z, pTableName->n); + strdequote(name); + + if (dbName == NULL) { + return buildInvalidOperationMsg(pMsgBuf, msg3); + } - memcpy(*pDst, pSrc, sizeof(STableDataBlocks)); - ((STableDataBlocks*)(*pDst))->cloned = true; + code = tNameSetDbName(pName, acctId, dbName, strlen(dbName)); + if (code != TSDB_CODE_SUCCESS) { + code = buildInvalidOperationMsg(pMsgBuf, msg2); + return code; + } - STableDataBlocks* pBlock = (STableDataBlocks*)(*pDst); - if (pBlock->pTableMeta) { - void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pBlock->pTableMeta)); - if (NULL == pNewMeta) { - taosMemoryFreeClear(*pDst); - return TSDB_CODE_OUT_OF_MEMORY; + code = tNameFromString(pName, name, T_NAME_TABLE); + if (code != 0) { + code = buildInvalidOperationMsg(pMsgBuf, msg1); } - memcpy(pNewMeta, pBlock->pTableMeta, TABLE_META_SIZE(pBlock->pTableMeta)); - pBlock->pTableMeta = pNewMeta; } - return qResetStmtDataBlock(*pDst, false); + return code; } -int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) { - int32_t code = qCloneStmtDataBlock(pDst, pSrc); - if (code) { - return code; +int32_t insFindCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) { + while (start < end) { + if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) { + return start; + } + ++start; } + return -1; +} - STableDataBlocks* pBlock = (STableDataBlocks*)*pDst; - pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize); - if (NULL == pBlock->pData) { - qFreeStmtDataBlock(pBlock); - return TSDB_CODE_OUT_OF_MEMORY; - } +void insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, + SArray* tagName, uint8_t tagNum) { + pTbReq->type = TD_CHILD_TABLE; + pTbReq->name = strdup(tname); + pTbReq->ctb.suid = suid; + pTbReq->ctb.tagNum = tagNum; + if (sname) pTbReq->ctb.stbName = strdup(sname); + pTbReq->ctb.pTag = (uint8_t*)pTag; + pTbReq->ctb.tagName = taosArrayDup(tagName); + pTbReq->ttl = TSDB_DEFAULT_TABLE_TTL; + pTbReq->commentLen = -1; + + return; +} - pBlock->vgId = vgId; +int32_t insMemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) { + SMemParam* pa = (SMemParam*)param; + SRowBuilder* rb = pa->rb; - if (pBlock->pTableMeta) { - pBlock->pTableMeta->uid = uid; - pBlock->pTableMeta->vgId = vgId; + if (value == NULL) { // it is a null data + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx); + return TSDB_CODE_SUCCESS; } - memset(pBlock->pData, 0, sizeof(SSubmitBlk)); + if (TSDB_DATA_TYPE_BINARY == pa->schema->type) { + const char* rowEnd = tdRowEnd(rb->pBuf); + STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len); + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx); + } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) { + // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' + int32_t output = 0; + const char* rowEnd = tdRowEnd(rb->pBuf); + if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { + if (errno == E2BIG) { + return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name); + } + char buf[512] = {0}; + snprintf(buf, tListLen(buf), "%s", strerror(errno)); + return buildSyntaxErrMsg(pMsgBuf, buf, value); + } + varDataSetLen(rowEnd, output); + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx); + } else { + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx); + } return TSDB_CODE_SUCCESS; } -STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataBlocks*)pDataBlock)->pTableMeta; } +int32_t insCheckTimestamp(STableDataBlocks* pDataBlocks, const char* start) { + // once the data block is disordered, we do NOT keep previous timestamp any more + if (!pDataBlocks->ordered) { + return TSDB_CODE_SUCCESS; + } -void qFreeStmtDataBlock(void* pDataBlock) { - if (pDataBlock == NULL) { - return; + TSKEY k = *(TSKEY*)start; + if (k <= pDataBlocks->prevTS) { + pDataBlocks->ordered = false; } - taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta); - taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData); - taosMemoryFreeClear(pDataBlock); + pDataBlocks->prevTS = k; + return TSDB_CODE_SUCCESS; } -void qDestroyStmtDataBlock(void* pBlock) { - if (pBlock == NULL) { - return; +static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) { + SSubmitReq* submit = (SSubmitReq*)blocks->pData; + submit->header.vgId = htonl(blocks->vg.vgId); + submit->header.contLen = htonl(blocks->size); + submit->length = submit->header.contLen; + submit->numOfBlocks = htonl(blocks->numOfTables); + SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); + int32_t numOfBlocks = blocks->numOfTables; + while (numOfBlocks--) { + int32_t dataLen = blk->dataLen; + int32_t schemaLen = blk->schemaLen; + blk->uid = htobe64(blk->uid); + blk->suid = htobe64(blk->suid); + blk->sversion = htonl(blk->sversion); + blk->dataLen = htonl(blk->dataLen); + blk->schemaLen = htonl(blk->schemaLen); + blk->numOfRows = htonl(blk->numOfRows); + blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen); } +} - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; - - pDataBlock->cloned = false; - destroyDataBlock(pDataBlock); +int32_t insBuildOutput(SInsertParseContext* pCxt) { + size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks); + pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); + if (NULL == pCxt->pOutput->pDataBlocks) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + for (size_t i = 0; i < numOfVg; ++i) { + STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i); + SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + if (NULL == dst) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); + dst->numOfTables = src->numOfTables; + dst->size = src->size; + TSWAP(dst->pData, src->pData); + buildMsgHeader(src, dst); + taosArrayPush(pCxt->pOutput->pDataBlocks, &dst); + } + return TSDB_CODE_SUCCESS; }