提交 93ca1168 编写于 作者: D dapan1121

stmt

上级 1c499c94
......@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "querynodes.h"
#include "query.h"
typedef struct SStmtCallback {
TAOS_STMT* pStmt;
......
......@@ -23,7 +23,7 @@ extern "C" {
typedef enum {
STMT_TYPE_INSERT = 1,
STMT_TYPE_MULTI_INSERT,
STMT_TYPE_QUERY,
STMT_TYPE_QUERY
} STMT_TYPE;
typedef enum {
......@@ -84,21 +84,24 @@ typedef struct STscStmt {
#define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define STMT_SWITCH_STATUS(_stmt, _newstatus, _errcode) do {
switch (_newstatus) {
case STMT_INIT:
if ((_stmt)->status != 0) return (_errcode);
break;
case STMT_PREPARE:
if ((_stmt)->status != STMT_INIT) STMT_ERR_RET(_errcode);
break;
case STMT_SETTBNAME:
break;
default:
STMT_ERR_RET(_errcode);
break;
}
} while (0)
#define STMT_SWITCH_STATUS(_stmt, _newstatus, _errcode) \
do { \
switch (_newstatus) { \
case STMT_INIT: \
if ((_stmt)->sql.status != 0) return (_errcode); \
break; \
case STMT_PREPARE: \
if ((_stmt)->sql.status != STMT_INIT) STMT_ERR_RET(_errcode); \
break; \
case STMT_SETTBNAME: \
break; \
default: \
STMT_ERR_RET(_errcode); \
break; \
} \
\
(_stmt)->sql.status = _newstatus; \
} while (0)
TAOS_STMT *stmtInit(TAOS *taos);
......@@ -106,7 +109,6 @@ int stmtClose(TAOS_STMT *stmt);
int stmtExec(TAOS_STMT *stmt);
char *stmtErrstr(TAOS_STMT *stmt);
int stmtAffectedRows(TAOS_STMT *stmt);
int stmtBind(TAOS_STMT *stmt, TAOS_BIND *bind);
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int stmtSetTbNameTags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags);
int stmtIsInsert(TAOS_STMT *stmt, int *insert);
......
......@@ -615,7 +615,15 @@ int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) {
return terrno;
}
return stmtBind(stmt, bind);
TAOS_MULTI_BIND mbind = {0};
mbind.buffer_type = bind->buffer_type;
mbind.buffer = bind->buffer;
mbind.buffer_length = bind->buffer_length;
mbind.length = bind->length;
mbind.is_null = bind->is_null;
mbind.num = 1;
return stmtBindBatch(stmt, &mbind);
}
int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
......@@ -634,6 +642,10 @@ int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
return stmtBindBatch(stmt, bind);
}
int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx) {
return stmtBindBatch(stmt, bind); /* TODO */
}
int taos_stmt_add_batch(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
......
......@@ -75,6 +75,8 @@ int32_t stmtParseSql(STscStmt* pStmt) {
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
}
STMT_ERR_RET(stmtCacheBlock(pStmt));
return TSDB_CODE_SUCCESS;
}
......@@ -152,8 +154,20 @@ int32_t stmtCacheBlock(STscStmt *pStmt) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtCleanExecCtx(STscStmt* pStmt, bool keepTable) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->sql.pQuery->pRoot;
int32_t stmtCleanBindInfo(STscStmt* pStmt) {
pStmt->bind.tbUid = 0;
pStmt->bind.tbSuid = 0;
pStmt->bind.tbType = 0;
pStmt->bind.needParse = true;
taosMemoryFreeClear(pStmt->bind.tbName);
destroyBoundColumnInfo(pStmt->bind.boundTags);
taosMemoryFreeClear(pStmt->bind.boundTags);
}
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
while (pIter) {
......@@ -178,13 +192,33 @@ int32_t stmtCleanExecCtx(STscStmt* pStmt, bool keepTable) {
taosHashCleanup(pStmt->exec.pBlockHash);
pStmt->exec.pBlockHash = NULL;
pStmt->bind.tbUid = 0;
pStmt->bind.tbSuid = 0;
pStmt->bind.tbType = 0;
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
return TSDB_CODE_SUCCESS;
}
int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
taosMemoryFree(pStmt->sql.sqlStr);
qDestroyQuery(pStmt->sql.pQuery);
void *pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
while (pIter) {
SStmtTableCache* pCache = *(SStmtTableCache**)pIter;
pCache->pDataBlock->cloned = false;
destroyDataBlock(pCache->pDataBlock);
destroyBoundColumnInfo(pCache->boundTags);
destroyBoundColumnInfo(pStmt->bind.boundTags);
taosMemoryFreeClear(pStmt->bind.boundTags);
pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
}
taosHashCleanup(pStmt->sql.pTableCache);
pStmt->sql.pTableCache = NULL;
memset(&pStmt->sql, 0, sizeof(pStmt->sql));
STMT_ERR_RET(stmtCleanExecInfo(pStmt, false));
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
return TSDB_CODE_SUCCESS;
}
......@@ -281,6 +315,10 @@ TAOS_STMT *stmtInit(TAOS *taos) {
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt->sql.status >= STMT_PREPARE) {
STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
}
STMT_SWITCH_STATUS(stmt, STMT_PREPARE, TSDB_CODE_TSC_STMT_API_ERROR);
pStmt->sql.sqlStr = strndup(sql, length);
......@@ -434,12 +472,7 @@ int stmtExec(TAOS_STMT *stmt) {
_return:
stmtCleanExecCtx(pStmt, (code ? false : true));
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
pStmt->bind.needParse = true;
stmtCleanExecInfo(pStmt, (code ? false : true));
++pStmt->sql.runTimes;
......@@ -466,6 +499,10 @@ int stmtAffectedRows(TAOS_STMT *stmt) {
}
int stmtIsInsert(TAOS_STMT *stmt, int *insert) {
STscStmt* pStmt = (STscStmt*)stmt;
*insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
return TSDB_CODE_SUCCESS;
}
......
......@@ -131,7 +131,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks*
int32_t schemaIdxCompar(const void *lhs, const void *rhs);
int32_t boundIdxCompar(const void *lhs, const void *rhs);
void setBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols);
void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
void destroyBoundColumnInfo(void* pBoundInfo);
void destroyBlockArrayList(SArray* pDataBlockList);
void destroyBlockHashmap(SHashObj* pDataBlockHash);
int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
......
......@@ -21,13 +21,30 @@ extern "C" {
#endif
#include "parser.h"
#include "parToken.h"
#include "parUtil.h"
typedef struct SKvParam {
SKVRowBuilder *builder;
SSchema *schema;
char buf[TSDB_MAX_TAGS_LEN];
} SKvParam;
#define CHECK_CODE(expr) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
return code; \
} \
} while (0)
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, char* dbName, SMsgBuf* pMsgBuf);
int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf);
int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param);
#ifdef __cplusplus
}
......
......@@ -40,14 +40,6 @@
sToken = tStrGetToken(pSql, &index, false); \
} while (0)
#define CHECK_CODE(expr) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
return code; \
} \
} while (0)
typedef struct SInsertParseContext {
SParseContext* pComCxt; // input
char *pSql; // input
......@@ -163,7 +155,8 @@ static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullD
return TSDB_CODE_SUCCESS;
}
int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, char* dbName, SMsgBuf* pMsgBuf) {
int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
const char* msg1 = "name too long";
const char* msg2 = "invalid database name";
const char* msg3 = "db is not specified";
......@@ -720,13 +713,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
return TSDB_CODE_SUCCESS;
}
typedef struct SKvParam {
SKVRowBuilder *builder;
SSchema *schema;
char buf[TSDB_MAX_TAGS_LEN];
} SKvParam;
static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param) {
int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param) {
SKvParam* pa = (SKvParam*) param;
int8_t type = pa->schema->type;
......@@ -1195,3 +1182,52 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
destroyInsertParseContext(&context);
return code;
}
int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen) {
SMsgBuf msg = {.buf = msgBuf, .len =msgBufLen};
SToken sToken;
int32_t code = 0;
char *tbName = NULL;
NEXT_TOKEN(pTableName, sToken);
if (sToken.n == 0) {
return buildInvalidOperationMsg(&msg, "empty table name");
}
code = createSName(pName, &sToken, acctId, dbName, &msg);
if (code) {
return code;
}
NEXT_TOKEN(pTableName, sToken);
if (SToken.n > 0) {
return buildInvalidOperationMsg(&msg, "table name format is wrong");
}
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
int32_t code = 0;
SInsertParseContext insertCtx = {
.pVgroupsHashObj = pVgHash,
.pTableBlockHashObj = pBlockHash,
.pOutput = pQuery->pRoot
};
// merge according to vgId
if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
CHECK_CODE_GOTO(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks), _return);
}
CHECK_CODE(buildOutput(&insertCtx));
return TSDB_CODE_SUCCESS;
}
......@@ -18,6 +18,7 @@
#include "catalog.h"
#include "parUtil.h"
#include "querynodes.h"
#include "parInt.h"
#define IS_RAW_PAYLOAD(t) \
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
......@@ -102,10 +103,12 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs) {
}
}
void destroyBoundColumnInfo(SParsedDataColInfo* pColList) {
if (NULL == pColList) {
void destroyBoundColumnInfo(void* pBoundInfo) {
if (NULL == pBoundInfo) {
return;
}
SParsedDataColInfo* pColList = (SParsedDataColInfo*)pBoundInfo;
taosMemoryFreeClear(pColList->boundColumns);
taosMemoryFreeClear(pColList->cols);
......@@ -567,3 +570,167 @@ int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo
pColInfo->boundNullLen);
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtTagsValue(STableDataBlocks *pDataBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND *bind, char *msgBuf, int32_t msgBufLen){
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR;
}
SKVRowBuilder tagBuilder;
if (tdInitKVRowBuilder(&tagBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
SKvParam param = {.builder = &tagBuilder};
for (int c = 0; c < tags->numOfBound; ++c) {
if (bind[c].is_null && bind[c].is_null[0]) {
KvRowAppend(&pBuf, NULL, 0, &param);
continue;
}
SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1]; // colId starts with 1
param.schema = pTagSchema;
int32_t colLen = pTagSchema->bytes;
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
colLen = bind[c].length[0];
}
CHECK_CODE(KvRowAppend(&pBuf, (char *)bind[c].buffer, colLen, &param));
}
SKVRow row = tdGetKVRowFromBuilder(&tagBuilder);
if (NULL == row) {
tdDestroyKVRowBuilder(&tagBuilder);
return buildInvalidOperationMsg(&pBuf, "tag value expected");
}
tdSortKVRowByColIdx(row);
SVCreateTbReq tbReq = {0};
CHECK_CODE(buildCreateTbReq(&tbReq, pName, row, suid));
CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq));
destroyCreateSubTbReq(&tbReq);
tdDestroyKVRowBuilder(&tagBuilder);
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtColsValue(STableDataBlocks *pDataBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) {
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
SMemParam param = {.rb = pBuilder};
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num);
for (int32_t r = 0; r < bind->num; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
// 1. set the parsed value from sql string
for (int c = 0; c < spd->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
param.schema = pColSchema;
getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);
if (bind[c].is_null && bind[c].is_null[r]) {
CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
} else {
int32_t colLen = pColSchema->bytes;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
colLen = bind[c].length[r];
}
CHECK_CODE(MemRowAppend(&pBuf, (char *)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
}
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
TSKEY tsKey = TD_ROW_KEY(row);
checkTimestamp(pDataBlock, (const char *)&tsKey);
}
}
// set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (spd->cols[i].valStat == VAL_STAT_NONE) { // the primary TS key is not VAL_STAT_NONE
tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
spd->cols[i].toffset);
}
}
}
pDataBlock->size += extendedRowSize;
}
SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
}
return TSDB_CODE_SUCCESS;
}
int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_t *fieldNum, TAOS_FIELD** fields) {
*fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
if (NULL == *fields) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1];
strcpy((*fields)[i].name, pTagSchema->name);
(*fields)[i].type = pTagSchema->type;
(*fields)[i].bytes = pTagSchema->bytes;
}
*fieldNum = boundInfo->numOfBound;
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtTagFields(STableDataBlocks *pDataBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields) {
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR;
}
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
if (tags->numOfBound <= 0) {
*fieldNum = 0;
*fields = NULL;
return TSDB_CODE_SUCCESS;
}
CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtColFields(STableDataBlocks *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields) {
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
*fieldNum = 0;
*fields = NULL;
return TSDB_CODE_SUCCESS;
}
CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
......@@ -51,212 +51,6 @@ int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery) {
return code;
}
int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen) {
SMsgBuf msg = {.buf = msgBuf, .len =msgBufLen};
SToken sToken;
int32_t code = 0;
char *tbName = NULL;
NEXT_TOKEN(pTableName, sToken);
if (sToken.n == 0) {
return buildInvalidOperationMsg(&msg, "empty table name");
}
code = createSName(pName, &sToken, acctId, dbName, &msg);
if (code) {
return code;
}
NEXT_TOKEN(pTableName, sToken);
if (SToken.n > 0) {
return buildInvalidOperationMsg(&msg, "table name format is wrong");
}
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtTagsValue(STableDataBlocks *pDataBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND *bind, char *msgBuf, int32_t msgBufLen){
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR;
}
SKVRowBuilder tagBuilder;
if (tdInitKVRowBuilder(&tagBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
SKvParam param = {.builder = &tagBuilder};
for (int c = 0; c < tags->numOfBound; ++c) {
if (bind[c].is_null && bind[c].is_null[0]) {
KvRowAppend(&pBuf, NULL, 0, &param);
continue;
}
SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1]; // colId starts with 1
param.schema = pTagSchema;
int32_t colLen = pTagSchema->bytes;
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
colLen = bind[c].length[0];
}
CHECK_CODE(KvRowAppend(&pBuf, (char *)bind[c].buffer, colLen, &param));
}
SKVRow row = tdGetKVRowFromBuilder(&tagBuilder);
if (NULL == row) {
tdDestroyKVRowBuilder(&tagBuilder);
return buildInvalidOperationMsg(&pBuf, "tag value expected");
}
tdSortKVRowByColIdx(row);
SVCreateTbReq tbReq = {0};
CHECK_CODE(buildCreateTbReq(&tbReq, pName, row, suid));
CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq));
destroyCreateSubTbReq(&tbReq);
tdDestroyKVRowBuilder(&tagBuilder);
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtColsValue(STableDataBlocks *pDataBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) {
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
SMemParam param = {.rb = pBuilder};
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num);
for (int32_t r = 0; r < bind->num; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
// 1. set the parsed value from sql string
for (int c = 0; c < spd->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
param.schema = pColSchema;
getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);
if (bind[c].is_null && bind[c].is_null[r]) {
CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
} else {
int32_t colLen = pColSchema->bytes;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
colLen = bind[c].length[r];
}
CHECK_CODE(MemRowAppend(&pBuf, (char *)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
}
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
TSKEY tsKey = TD_ROW_KEY(row);
checkTimestamp(pDataBlock, (const char *)&tsKey);
}
}
// set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (spd->cols[i].valStat == VAL_STAT_NONE) { // the primary TS key is not VAL_STAT_NONE
tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
spd->cols[i].toffset);
}
}
}
pDataBlock->size += extendedRowSize;
}
SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
}
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
int32_t code = 0;
SInsertParseContext insertCtx = {
.pVgroupsHashObj = pVgHash,
.pTableBlockHashObj = pBlockHash,
.pOutput = pQuery->pRoot
};
// merge according to vgId
if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
CHECK_CODE_GOTO(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks), _return);
}
CHECK_CODE(buildOutput(&insertCtx));
return TSDB_CODE_SUCCESS;
}
int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_t *fieldNum, TAOS_FIELD** fields) {
*fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
if (NULL == *fields) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1];
strcpy((*fields)[i].name, pTagSchema->name);
(*fields)[i].type = pTagSchema->type;
(*fields)[i].bytes = pTagSchema->bytes;
}
*fieldNum = boundInfo->numOfBound;
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtTagFields(STableDataBlocks *pDataBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields) {
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR;
}
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
if (tags->numOfBound <= 0) {
*fieldNum = 0;
*fields = NULL;
return TSDB_CODE_SUCCESS;
}
CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtColFields(STableDataBlocks *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields) {
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
*fieldNum = 0;
*fields = NULL;
return TSDB_CODE_SUCCESS;
}
CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
void qDestroyQuery(SQuery* pQueryNode) {
if (NULL == pQueryNode) {
return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册