提交 64cf2483 编写于 作者: X Xiaoyu Wang

enh: insert parser refactor

上级 12fd4bfc
...@@ -169,7 +169,7 @@ typedef struct { ...@@ -169,7 +169,7 @@ typedef struct {
// #define TD_ROW_VER(r) ((r)->ver) // #define TD_ROW_VER(r) ((r)->ver)
#define TD_ROW_KEY_ADDR(r) (r) #define TD_ROW_KEY_ADDR(r) (r)
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and // N.B. If without STSchema, insGetExtendedRowSize() is used to get the rowMaxBytes and
// (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined. // (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined.
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) ((s)->tlen + TD_ROW_HEAD_LEN) #define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) ((s)->tlen + TD_ROW_HEAD_LEN)
......
...@@ -13,17 +13,30 @@ ...@@ -13,17 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_DATABLOCKMGT_H #ifndef TDENGINE_PAR_INSERT_UTIL_H
#define TDENGINE_DATABLOCKMGT_H #define TDENGINE_PAR_INSERT_UTIL_H
#include "catalog.h" #include "parUtil.h"
#include "os.h"
#include "query.h" struct SToken;
#include "tname.h"
#include "ttypes.h"
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED) #define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
#define NEXT_TOKEN(pSql, sToken) \
do { \
int32_t index = 0; \
sToken = tStrGetToken(pSql, &index, false); \
pSql += index; \
} while (0)
#define CHECK_CODE(expr) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
return code; \
} \
} while (0)
typedef enum EOrderStatus { typedef enum EOrderStatus {
ORDER_STATUS_UNKNOWN = 0, ORDER_STATUS_UNKNOWN = 0,
ORDER_STATUS_ORDERED = 1, ORDER_STATUS_ORDERED = 1,
...@@ -60,6 +73,49 @@ typedef struct SParsedDataColInfo { ...@@ -60,6 +73,49 @@ typedef struct SParsedDataColInfo {
int8_t orderStatus; // bound columns int8_t orderStatus; // bound columns
} SParsedDataColInfo; } SParsedDataColInfo;
typedef struct SInsertParseBaseContext {
SParseContext *pComCxt;
char *pSql;
SMsgBuf msg;
} SInsertParseBaseContext;
typedef struct SInsertParseContext {
SParseContext *pComCxt; // input
char *pSql; // input
SMsgBuf msg; // input
STableMeta *pTableMeta; // each table
SParsedDataColInfo tags; // each table
SVCreateTbReq createTblReq; // each table
SHashObj *pVgroupsHashObj; // global
SHashObj *pTableBlockHashObj; // global
SHashObj *pSubTableHashObj; // global
SArray *pVgDataBlocks; // global
SHashObj *pTableNameHashObj; // global
SHashObj *pDbFNameHashObj; // global
int32_t totalNum;
SVnodeModifOpStmt *pOutput;
SStmtCallback *pStmtCb;
SParseMetaCache *pMetaCache;
char sTableName[TSDB_TABLE_NAME_LEN];
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
int64_t memElapsed;
int64_t parRowElapsed;
} SInsertParseContext;
typedef struct SInsertParseSyntaxCxt {
SParseContext *pComCxt;
char *pSql;
SMsgBuf msg;
SParseMetaCache *pMetaCache;
} SInsertParseSyntaxCxt;
typedef struct SMemParam {
SRowBuilder *rb;
SSchema *schema;
int32_t toffset;
col_id_t colIdx;
} SMemParam;
typedef struct { typedef struct {
uint8_t rowType; // default is 0, that is SDataRow uint8_t rowType; // default is 0, that is SDataRow
int32_t rowSize; int32_t rowSize;
...@@ -84,65 +140,28 @@ typedef struct STableDataBlocks { ...@@ -84,65 +140,28 @@ typedef struct STableDataBlocks {
SRowBuilder rowBuilder; SRowBuilder rowBuilder;
} STableDataBlocks; } STableDataBlocks;
static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) { int32_t insGetExtendedRowSize(STableDataBlocks *pBlock);
STableComInfo *pTableInfo = &pBlock->pTableMeta->tableInfo; void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx, int32_t *toffset, col_id_t *colIdx);
ASSERT(pBlock->rowSize == pTableInfo->rowSize); int32_t insSetBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *dataBuf, int32_t numOfRows);
return pBlock->rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + pBlock->boundColumnInfo.extendedVarLen + int32_t insSchemaIdxCompar(const void *lhs, const void *rhs);
(int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1); int32_t insBoundIdxCompar(const void *lhs, const void *rhs);
} void insSetBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols);
void insDestroyBlockArrayList(SArray *pDataBlockList);
static FORCE_INLINE void getSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx, int32_t *toffset, void insDestroyBlockHashmap(SHashObj *pDataBlockHash);
col_id_t *colIdx) { int32_t insInitRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
col_id_t schemaIdx = 0; int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset,
if (IS_DATA_COL_ORDERED(spd)) { int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks,
schemaIdx = spd->boundColumns[idx]; SArray *pBlockList, SVCreateTbReq *pCreateTbReq);
if (TD_IS_TP_ROW_T(rowType)) { int32_t insMergeTableDataBlocks(SHashObj *pHashObj, uint8_t payloadType, SArray **pVgDataBlocks);
*toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq);
*colIdx = schemaIdx; int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
} else { int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf);
*toffset = idx * sizeof(SKvRowIdx); // the offset of SKvRowIdx int32_t insFindCol(struct SToken *pColname, int32_t start, int32_t end, SSchema *pSchema);
*colIdx = idx; void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname,
} SArray *tagName, uint8_t tagNum);
} else { int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param);
ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx); int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start);
schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx; int32_t insBuildOutput(SInsertParseContext *pCxt);
if (TD_IS_TP_ROW_T(rowType)) { void insDestroyDataBlock(STableDataBlocks *pDataBlock);
*toffset = (spd->cols + schemaIdx)->toffset;
*colIdx = schemaIdx; #endif // TDENGINE_PAR_INSERT_UTIL_H
} else {
*toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SKvRowIdx);
*colIdx = (spd->colIdxInfo + idx)->finalIdx;
}
}
}
static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *dataBuf, int32_t numOfRows) {
pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? 0 : dataBuf->pTableMeta->suid);
pBlocks->uid = dataBuf->pTableMeta->uid;
pBlocks->sversion = dataBuf->pTableMeta->sversion;
pBlocks->schemaLen = dataBuf->createTbReqLen;
if (pBlocks->numOfRows + numOfRows >= INT32_MAX) {
return TSDB_CODE_TSC_INVALID_OPERATION;
} else {
pBlocks->numOfRows += numOfRows;
return TSDB_CODE_SUCCESS;
}
}
int32_t schemaIdxCompar(const void *lhs, const void *rhs);
int32_t boundIdxCompar(const void *lhs, const void *rhs);
void setBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols);
void destroyBlockArrayList(SArray *pDataBlockList);
void destroyBlockHashmap(SHashObj *pDataBlockHash);
int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t *numOfRows);
int32_t getDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset,
int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks, SArray *pBlockList,
SVCreateTbReq *pCreateTbReq);
int32_t mergeTableDataBlocks(SHashObj *pHashObj, uint8_t payloadType, SArray **pVgDataBlocks);
int32_t buildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq);
int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
#endif // TDENGINE_DATABLOCKMGT_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "parInsertUtil.h"
#include "parInt.h"
#include "parToken.h"
#include "ttime.h"
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
int32_t msgBufLen) {
SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
SToken sToken;
int32_t code = 0;
char* tbName = NULL;
NEXT_TOKEN(pTableName, sToken);
if (sToken.n == 0) {
return buildInvalidOperationMsg(&msg, "empty table name");
}
code = insCreateSName(pName, &sToken, acctId, dbName, &msg);
if (code) {
return code;
}
NEXT_TOKEN(pTableName, sToken);
if (sToken.n > 0) {
return buildInvalidOperationMsg(&msg, "table name format is wrong");
}
return TSDB_CODE_SUCCESS;
}
typedef struct SmlExecTableHandle {
SParsedDataColInfo tags; // each table
SVCreateTbReq createTblReq; // each table
} SmlExecTableHandle;
typedef struct SmlExecHandle {
SHashObj* pBlockHash;
SmlExecTableHandle tableExecHandle;
SQuery* pQuery;
} SSmlExecHandle;
static void smlDestroyTableHandle(void* pHandle) {
SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle;
destroyBoundColumnInfo(&handle->tags);
tdDestroySVCreateTbReq(&handle->createTblReq);
}
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema, bool isTag) {
col_id_t nCols = pColList->numOfCols;
pColList->numOfBound = 0;
pColList->boundNullLen = 0;
memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
for (col_id_t i = 0; i < nCols; ++i) {
pColList->cols[i].valStat = VAL_STAT_NONE;
}
bool isOrdered = true;
col_id_t lastColIdx = -1; // last column found
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv* kv = taosArrayGetP(cols, i);
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
col_id_t t = lastColIdx + 1;
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, nCols, pSchema));
uDebug("SML, index:%d, t:%d, ncols:%d", index, t, nCols);
if (index < 0 && t > 0) {
index = insFindCol(&sToken, 0, t, pSchema);
isOrdered = false;
}
if (index < 0) {
uError("smlBoundColumnData. index:%d", index);
return TSDB_CODE_SML_INVALID_DATA;
}
if (pColList->cols[index].valStat == VAL_STAT_HAS) {
uError("smlBoundColumnData. already set. index:%d", index);
return TSDB_CODE_SML_INVALID_DATA;
}
lastColIdx = index;
pColList->cols[index].valStat = VAL_STAT_HAS;
pColList->boundColumns[pColList->numOfBound] = index;
++pColList->numOfBound;
switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY:
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
break;
case TSDB_DATA_TYPE_NCHAR:
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
break;
default:
pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
break;
}
}
pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
if (!isOrdered) {
pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
if (NULL == pColList->colIdxInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].schemaColIdx = pColList->boundColumns[i];
pColIdx[i].boundIdx = i;
}
taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insSchemaIdxCompar);
for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].finalIdx = i;
}
taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insBoundIdxCompar);
}
if (pColList->numOfCols > pColList->numOfBound) {
memset(&pColList->boundColumns[pColList->numOfBound], 0,
sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief No json tag for schemaless
*
* @param cols
* @param tags
* @param pSchema
* @param ppTag
* @param msg
* @return int32_t
*/
static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName,
SMsgBuf* msg) {
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
if (!pTagArray) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
*tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
if (!*tagName) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < tags->numOfBound; ++i) {
SSchema* pTagSchema = &pSchema[tags->boundColumns[i]];
SSmlKv* kv = taosArrayGetP(cols, i);
taosArrayPush(*tagName, pTagSchema->name);
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
// strcpy(val.colName, pTagSchema->name);
if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
val.pData = (uint8_t*)kv->value;
val.nData = kv->length;
} else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
int32_t output = 0;
void* p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE);
if (p == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), kv->length * TSDB_NCHAR_SIZE, &output)) {
if (errno == E2BIG) {
taosMemoryFree(p);
code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
goto end;
}
char buf[512] = {0};
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
taosMemoryFree(p);
code = buildSyntaxErrMsg(msg, buf, kv->value);
goto end;
}
val.pData = p;
val.nData = output;
} else {
memcpy(&val.i64, &(kv->value), kv->length);
}
taosArrayPush(pTagArray, &val);
}
code = tTagNew(pTagArray, 1, false, ppTag);
end:
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
if (p->type == TSDB_DATA_TYPE_NCHAR) {
taosMemoryFree(p->pData);
}
}
taosArrayDestroy(pTagArray);
return code;
}
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
char* tableName, const char* sTableName, int32_t sTableNameLen, char* msgBuf, int16_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
insSetBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, true);
if (ret != TSDB_CODE_SUCCESS) {
buildInvalidOperationMsg(&pBuf, "bound tags error");
return ret;
}
STag* pTag = NULL;
SArray* tagName = NULL;
ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, &pTag, &tagName, &pBuf);
if (ret != TSDB_CODE_SUCCESS) {
taosArrayDestroy(tagName);
return ret;
}
insBuildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName,
pTableMeta->tableInfo.numOfTags);
taosArrayDestroy(tagName);
smlHandle->tableExecHandle.createTblReq.ctb.stbName = taosMemoryMalloc(sTableNameLen + 1);
memcpy(smlHandle->tableExecHandle.createTblReq.ctb.stbName, sTableName, sTableNameLen);
smlHandle->tableExecHandle.createTblReq.ctb.stbName[sTableNameLen] = 0;
STableDataBlocks* pDataBlock = NULL;
ret = insGetDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize,
pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq);
if (ret != TSDB_CODE_SUCCESS) {
buildInvalidOperationMsg(&pBuf, "create data block error");
return ret;
}
SSchema* pSchema = getTableColumnSchema(pTableMeta);
ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema, false);
if (ret != TSDB_CODE_SUCCESS) {
buildInvalidOperationMsg(&pBuf, "bound cols error");
return ret;
}
int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
SMemParam param = {.rb = pBuilder};
insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);
int32_t rowNum = taosArrayGetSize(cols);
if (rowNum <= 0) {
return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
}
ret = insAllocateMemForSize(pDataBlock, extendedRowSize * rowNum);
if (ret != TSDB_CODE_SUCCESS) {
buildInvalidOperationMsg(&pBuf, "allocate memory error");
return ret;
}
for (int32_t r = 0; r < rowNum; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
void* rowData = taosArrayGetP(cols, r);
size_t rowDataSize = 0;
if (format) {
rowDataSize = taosArrayGetSize(rowData);
}
// 1. set the parsed value from sql string
for (int c = 0, j = 0; c < spd->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
param.schema = pColSchema;
insGetSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);
SSmlKv* kv = NULL;
if (format) {
if (j < rowDataSize) {
kv = taosArrayGetP(rowData, j);
if (rowDataSize != spd->numOfBound && j != 0 &&
(kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) {
kv = NULL;
} else {
j++;
}
}
} else {
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
if (p) kv = *p;
}
if (kv) {
int32_t colLen = kv->length;
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
// uError("SML:data before:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision);
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
// uError("SML:data after:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision);
}
if (IS_VAR_DATA_TYPE(kv->type)) {
insMemRowAppend(&pBuf, kv->value, colLen, &param);
} else {
insMemRowAppend(&pBuf, &(kv->value), colLen, &param);
}
} else {
pBuilder->hasNone = true;
}
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
TSKEY tsKey = TD_ROW_KEY(row);
insCheckTimestamp(pDataBlock, (const char*)&tsKey);
}
}
// set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
pBuilder->hasNone = true;
}
tdSRowEnd(pBuilder);
pDataBlock->size += extendedRowSize;
}
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, rowNum)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
return TSDB_CODE_SUCCESS;
}
void* smlInitHandle(SQuery* pQuery) {
SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
if (!handle) return NULL;
handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
handle->pQuery = pQuery;
return handle;
}
void smlDestroyHandle(void* pHandle) {
if (!pHandle) return;
SSmlExecHandle* handle = (SSmlExecHandle*)pHandle;
insDestroyBlockHashmap(handle->pBlockHash);
smlDestroyTableHandle(&handle->tableExecHandle);
taosMemoryFree(handle);
}
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "parInsertUtil.h"
#include "parInt.h"
#include "parToken.h"
#include "query.h"
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"
typedef struct SKvParam {
int16_t pos;
SArray* pTagVals;
SSchema* schema;
char buf[TSDB_MAX_TAGS_LEN];
} SKvParam;
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
SVnodeModifOpStmt* modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot;
int32_t code = 0;
SInsertParseContext insertCtx = {
.pVgroupsHashObj = pVgHash,
.pTableBlockHashObj = pBlockHash,
.pOutput = (SVnodeModifOpStmt*)pQuery->pRoot,
};
// merge according to vgId
if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
CHECK_CODE(
insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
}
CHECK_CODE(insBuildOutput(&insertCtx));
insDestroyBlockArrayList(insertCtx.pVgDataBlocks);
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR;
}
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
if (!pTagArray) {
return buildInvalidOperationMsg(&pBuf, "out of memory");
}
SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
if (!tagName) {
return buildInvalidOperationMsg(&pBuf, "out of memory");
}
int32_t code = TSDB_CODE_SUCCESS;
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
bool isJson = false;
STag* pTag = NULL;
for (int c = 0; c < tags->numOfBound; ++c) {
if (bind[c].is_null && bind[c].is_null[0]) {
continue;
}
SSchema* pTagSchema = &pSchema[tags->boundColumns[c]];
int32_t colLen = pTagSchema->bytes;
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
colLen = bind[c].length[0];
}
taosArrayPush(tagName, pTagSchema->name);
if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer);
goto end;
}
isJson = true;
char* tmp = taosMemoryCalloc(1, colLen + 1);
memcpy(tmp, bind[c].buffer, colLen);
code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf);
taosMemoryFree(tmp);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
} else {
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
// strcpy(val.colName, pTagSchema->name);
if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
val.pData = (uint8_t*)bind[c].buffer;
val.nData = colLen;
} else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
int32_t output = 0;
void* p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE);
if (p == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) {
if (errno == E2BIG) {
taosMemoryFree(p);
code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
goto end;
}
char buf[512] = {0};
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
taosMemoryFree(p);
code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer);
goto end;
}
val.pData = p;
val.nData = output;
} else {
memcpy(&val.i64, bind[c].buffer, colLen);
}
taosArrayPush(pTagArray, &val);
}
}
if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
goto end;
}
SVCreateTbReq tbReq = {0};
insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags);
code = insBuildCreateTbMsg(pDataBlock, &tbReq);
tdDestroySVCreateTbReq(&tbReq);
end:
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
if (p->type == TSDB_DATA_TYPE_NCHAR) {
taosMemoryFreeClear(p->pData);
}
}
taosArrayDestroy(pTagArray);
taosArrayDestroy(tagName);
return code;
}
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
SMemParam param = {.rb = pBuilder};
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t rowNum = bind->num;
CHECK_CODE(
insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num));
for (int32_t r = 0; r < bind->num; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
for (int c = 0; c < spd->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
if (bind[c].num != rowNum) {
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
}
param.schema = pColSchema;
insGetSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);
if (bind[c].is_null && bind[c].is_null[r]) {
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
}
CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, &param));
} else {
if (bind[c].buffer_type != pColSchema->type) {
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
}
int32_t colLen = pColSchema->bytes;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
colLen = bind[c].length[r];
}
CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
}
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
TSKEY tsKey = TD_ROW_KEY(row);
insCheckTimestamp(pDataBlock, (const char*)&tsKey);
}
}
// set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
pBuilder->hasNone = true;
}
tdSRowEnd(pBuilder);
#ifdef TD_DEBUG_PRINT_ROW
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
taosMemoryFree(pSTSchema);
#endif
pDataBlock->size += extendedRowSize;
}
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, bind->num)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
int32_t rowNum) {
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
SMemParam param = {.rb = pBuilder};
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
bool rowStart = (0 == colIdx);
bool rowEnd = ((colIdx + 1) == spd->numOfBound);
if (rowStart) {
CHECK_CODE(
insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num));
}
for (int32_t r = 0; r < bind->num; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r); // skip the SSubmitBlk header
if (rowStart) {
tdSRowResetBuf(pBuilder, row);
} else {
tdSRowGetBuf(pBuilder, row);
}
SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx]];
if (bind->num != rowNum) {
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
}
param.schema = pColSchema;
insGetSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, &param.toffset, &param.colIdx);
if (bind->is_null && bind->is_null[r]) {
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
}
CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, &param));
} else {
if (bind->buffer_type != pColSchema->type) {
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
}
int32_t colLen = pColSchema->bytes;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
colLen = bind->length[r];
}
CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, &param));
}
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
TSKEY tsKey = TD_ROW_KEY(row);
insCheckTimestamp(pDataBlock, (const char*)&tsKey);
}
// set the null value for the columns that do not assign values
if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
pBuilder->hasNone = true;
}
if (rowEnd) {
tdSRowEnd(pBuilder);
}
#ifdef TD_DEBUG_PRINT_ROW
if (rowEnd) {
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
taosMemoryFree(pSTSchema);
}
#endif
}
if (rowEnd) {
pDataBlock->size += extendedRowSize * bind->num;
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, bind->num)) {
return buildInvalidOperationMsg(&pBuf,
"too many rows in sql, total number of rows should be less than INT32_MAX");
}
}
return TSDB_CODE_SUCCESS;
}
int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
uint8_t timePrec) {
if (fields) {
*fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
if (NULL == *fields) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSchema* schema = &pSchema[boundInfo->boundColumns[0]];
if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) {
(*fields)[0].precision = timePrec;
}
for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
schema = &pSchema[boundInfo->boundColumns[i]];
strcpy((*fields)[i].name, schema->name);
(*fields)[i].type = schema->type;
(*fields)[i].bytes = schema->bytes;
}
}
*fieldNum = boundInfo->numOfBound;
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) {
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR;
}
if (pDataBlock->pTableMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pTableMeta->tableType != TSDB_CHILD_TABLE) {
return TSDB_CODE_TSC_STMT_API_ERROR;
}
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
if (tags->numOfBound <= 0) {
*fieldNum = 0;
*fields = NULL;
return TSDB_CODE_SUCCESS;
}
CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields, 0));
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) {
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
*fieldNum = 0;
if (fields) {
*fields = NULL;
}
return TSDB_CODE_SUCCESS;
}
CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields,
pDataBlock->pTableMeta->tableInfo.precision));
return TSDB_CODE_SUCCESS;
}
int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
STableDataBlocks* pBlock = (STableDataBlocks*)block;
if (keepBuf) {
taosMemoryFreeClear(pBlock->pData);
pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE);
if (NULL == pBlock->pData) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
} else {
pBlock->pData = NULL;
}
pBlock->ordered = true;
pBlock->prevTS = INT64_MIN;
pBlock->size = sizeof(SSubmitBlk);
pBlock->tsSource = -1;
pBlock->numOfTables = 1;
pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
pBlock->headerSize = pBlock->size;
pBlock->createTbReqLen = 0;
memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
return TSDB_CODE_SUCCESS;
}
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
*pDst = taosMemoryMalloc(sizeof(STableDataBlocks));
if (NULL == *pDst) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
((STableDataBlocks*)(*pDst))->cloned = true;
STableDataBlocks* pBlock = (STableDataBlocks*)(*pDst);
if (pBlock->pTableMeta) {
void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pBlock->pTableMeta));
if (NULL == pNewMeta) {
taosMemoryFreeClear(*pDst);
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pNewMeta, pBlock->pTableMeta, TABLE_META_SIZE(pBlock->pTableMeta));
pBlock->pTableMeta = pNewMeta;
}
return qResetStmtDataBlock(*pDst, false);
}
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
int32_t code = qCloneStmtDataBlock(pDst, pSrc);
if (code) {
return code;
}
STableDataBlocks* pBlock = (STableDataBlocks*)*pDst;
pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
if (NULL == pBlock->pData) {
qFreeStmtDataBlock(pBlock);
return TSDB_CODE_OUT_OF_MEMORY;
}
pBlock->vgId = vgId;
if (pBlock->pTableMeta) {
pBlock->pTableMeta->uid = uid;
pBlock->pTableMeta->vgId = vgId;
}
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
return TSDB_CODE_SUCCESS;
}
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataBlocks*)pDataBlock)->pTableMeta; }
void qFreeStmtDataBlock(void* pDataBlock) {
if (pDataBlock == NULL) {
return;
}
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta);
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
taosMemoryFreeClear(pDataBlock);
}
void qDestroyStmtDataBlock(void* pBlock) {
if (pBlock == NULL) {
return;
}
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
pDataBlock->cloned = false;
insDestroyDataBlock(pDataBlock);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册