diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 3e78da63de2f3a9726aad07ef7c7cc615ebda56e..5e6c5233f7a9d5d373fd9d8222df102499c4cb48 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -280,13 +280,22 @@ typedef struct SVgDataBlocks { char *pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ... } SVgDataBlocks; +typedef struct SStmtDataCtx { + uint64_t tbUid; + SHashObj* pVgroupsHashObj; // global + SHashObj* pTableBlockHashObj; // global + SHashObj* pSubTableHashObj; // global + SArray* pTableDataBlocks; // global +} SStmtDataCtx; + typedef struct SVnodeModifOpStmt { - ENodeType nodeType; - ENodeType sqlNodeType; - SArray* pDataBlocks; // data block for each vgroup, SArray. - uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert - uint32_t insertType; // insert data from [file|sql statement| bound statement] - const char* sql; // current sql statement position + ENodeType nodeType; + ENodeType sqlNodeType; + SArray* pDataBlocks; // data block for each vgroup, SArray. + uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert + uint32_t insertType; // insert data from [file|sql statement| bound statement] + const char* sql; // current sql statement position + SStmtDataCtx stmtCtx; } SVnodeModifOpStmt; typedef struct SExplainOptions { diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index fef624288a7eff2ce9ca83447d20076e15b76149..eea9da4e520978ea1265a5a50ecd20b56d5788c8 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -22,6 +22,11 @@ extern "C" { #include "querynodes.h" +typedef struct SStmtCallback { + TAOS_STMT* pStmt; + int32_t (*getTbNameFn)(TAOS_STMT*, char**); +} SStmtCallback; + typedef struct SParseContext { uint64_t requestId; int32_t acctId; @@ -34,6 +39,7 @@ typedef struct SParseContext { char *pMsg; // extended error message if exists to help identifying the problem in sql statement. int32_t msgLen; // max length of the msg struct SCatalog *pCatalog; + SStmtCallback *pStmtCb; } SParseContext; typedef struct SCmdMsgInfo { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 60535e2f493a3bcb1769f067fb7e13a6fb715452..428dca96f8738f47b8711d033f2e94e09c6b2acf 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -132,6 +132,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222) #define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) #define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) +#define TSDB_CODE_TSC_STMT_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225) +#define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226) // mnode-common #define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 4e3299fd1ba9cb51628ce970380a1fe512cdf546..b352b65bea180cd4fb88f3ae8a3b7d903cb74f51 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -213,6 +213,7 @@ typedef struct SRequestObj { int32_t sqlLen; int64_t self; char* msgBuf; + int32_t msgBufLen; void* pInfo; // sql parse info, generated by parser module int32_t code; SArray* dbList; diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index c29361758dda67b079ca613651550c8d3ccb5ca8..3ff7c0006b3f69f8fe9e41ecbb9441bdc52425b3 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -26,21 +26,51 @@ typedef enum { STMT_TYPE_QUERY, } STMT_TYPE; +typedef enum { + STMT_INIT = 1, + STMT_PREPARE, + STMT_SETTBNAME, + STMT_BIND, + STMT_BIND_COL, + STMT_ADD_BATCH, + STMT_EXECUTE +} STMT_STATUS; + typedef struct STscStmt { - STMT_TYPE type; - //int16_t last; - //STscObj* taos; - //SSqlObj* pSql; + STMT_TYPE type; + STMT_STATUS status; + STscObj* taos; + SRequestObj* pRequest; + SQuery* pQuery; + char* sql; + int32_t sqlLen; + char* tbName; + TAOS_BIND* bindTags; //SMultiTbStmt mtb; //SNormalStmt normal; //int numOfRows; } STscStmt; + #define STMT_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) +#define STMT_CHK_STATUS(_stmt, _status, _v) do { + switch (_status) { + case STMT_INIT: + if ((_stmt)->status != 0) return (_v); + break; + case STMT_PREPARE: + if ((_stmt)->status != STMT_INIT) STMT_ERR_RET(_v); + break; + case STMT_SETTBNAME: + break; + } +} while (0) + + TAOS_STMT *stmtInit(TAOS *taos); int stmtClose(TAOS_STMT *stmt); int stmtExec(TAOS_STMT *stmt); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 7f413bdabcf9dc926f56f9de1c3fc9978d77d4ee..e0e2e9d2c8826a25f80c831e6cb939eb4f77aa3a 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -186,6 +186,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty pRequest->pTscObj = pObj; pRequest->body.fp = fp; // not used it yet pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); + pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE; tsem_init(&pRequest->body.rspSem, 0, 0); registerRequest(pRequest); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0252dfe6347fdd4244a561fec90da03b510b57cc..37597ba8e98efa07c68809b23c850f78eb0fcf8d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -143,7 +143,7 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj* return TSDB_CODE_SUCCESS; } -int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) { +int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) { STscObj* pTscObj = pRequest->pTscObj; SParseContext cxt = { @@ -156,6 +156,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) { .pMsg = pRequest->msgBuf, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, .pTransporter = pTscObj->pAppInfo->pTransporter, + .pStmtCb = pStmtCb, }; cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); @@ -289,7 +290,7 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); if (TSDB_CODE_SUCCESS == code) { - code = parseSql(pRequest, false, &pQuery); + code = parseSql(pRequest, false, &pQuery, NULL); } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 84f9c7ac30042788a2d08b9321769638395db97a..56132c304109acc1a397b42ab9954aec8d126446 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -569,84 +569,80 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) { return stmtInit(taos); } -int taos_stmt_close(TAOS_STMT *stmt) { - if (stmt == NULL) { +int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { + if (stmt == NULL || sql == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; return terrno; } - return stmtClose(stmt); + return stmtPrepare(stmt, sql, length); } -int taos_stmt_execute(TAOS_STMT *stmt) { - if (stmt == NULL) { +int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) { + if (stmt == NULL || name == NULL || tags == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; return terrno; } - return stmtExec(stmt); + return stmtSetTbNameTags(stmt, name, tags); } -char *taos_stmt_errstr(TAOS_STMT *stmt) { - if (stmt == NULL) { +int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { + if (stmt == NULL || name == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; - return NULL; + return terrno; } - return stmtErrstr(stmt); + return stmtSetTbNameTags(stmt, name, NULL); } -int taos_stmt_affected_rows(TAOS_STMT *stmt) { - if (stmt == NULL) { +int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) { + if (stmt == NULL || bind == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; - return 0; + return terrno; } - return stmtAffectedRows(stmt); + return stmtBind(stmt, bind); } -int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) { +int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { if (stmt == NULL || bind == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; return terrno; } - return stmtBind(stmt, bind); -} - -int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { - if (stmt == NULL || sql == NULL) { - tscError("NULL parameter for %s", __FUNCTION__); + if (bind->num <= 0 || bind->num > INT16_MAX) { + tscError("invalid bind num %d", bind->num); terrno = TSDB_CODE_INVALID_PARA; return terrno; } - return stmtPrepare(stmt, sql, length); + return stmtBindBatch(stmt, bind); } -int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) { - if (stmt == NULL || name == NULL || tags == NULL) { +int taos_stmt_add_batch(TAOS_STMT *stmt) { + if (stmt == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; return terrno; } - return stmtSetTbNameTags(stmt, name, tags); + return stmtAddBatch(stmt); } -int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { - if (stmt == NULL || name == NULL) { +int taos_stmt_execute(TAOS_STMT *stmt) { + if (stmt == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; return terrno; } - return stmtSetTbNameTags(stmt, name, NULL); + return stmtExec(stmt); } int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) { @@ -669,34 +665,47 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) { return stmtGetParamNum(stmt, nums); } -int taos_stmt_add_batch(TAOS_STMT *stmt) { +TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) { if (stmt == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return NULL; } - return stmtAddBatch(stmt); + return stmtUseResult(stmt); } -TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) { +char *taos_stmt_errstr(TAOS_STMT *stmt) { if (stmt == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; return NULL; } - return stmtUseResult(stmt); + return stmtErrstr(stmt); } -int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { - if (stmt == NULL || bind == NULL) { +int taos_stmt_affected_rows(TAOS_STMT *stmt) { + if (stmt == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return 0; + } + + return stmtAffectedRows(stmt); +} + + + + +int taos_stmt_close(TAOS_STMT *stmt) { + if (stmt == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; return terrno; } - return stmtBindBatch(stmt, bind); + return stmtClose(stmt); } diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 8c4cff92514cc8c850c9a25458865c1741cf2045..a43ffdf31e7e7a8e5e332c8223c7afb9e26a76bb 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -4,11 +4,23 @@ #include "clientStmt.h" #include "tdef.h" +int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) { + STscStmt* pStmt = (STscStmt*)stmt; + + if (NULL == pStmt->tbName) { + tscError("no table name set"); + STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR); + } + + *tbName = pStmt->tbName; + + return TSDB_CODE_SUCCESS; +} + TAOS_STMT *stmtInit(TAOS *taos) { STscObj* pObj = (STscObj*)taos; STscStmt* pStmt = NULL; -#if 0 pStmt = taosMemoryCalloc(1, sizeof(STscStmt)); if (pStmt == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -16,38 +28,64 @@ TAOS_STMT *stmtInit(TAOS *taos) { return NULL; } pStmt->taos = pObj; + pStmt->status = STMT_INIT; - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + return pStmt; +} - if (pSql == NULL) { - free(pStmt); - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscError("failed to allocate memory for statement"); - return NULL; +int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { + STscStmt* pStmt = (STscStmt*)stmt; + + STMT_CHK_STATUS(stmt, STMT_PREPARE, TSDB_CODE_TSC_STMT_STATUS_ERROR); + + pStmt->sql = strndup(sql, length); + pStmt->sqlLen = length; + + return TSDB_CODE_SUCCESS; +} + + +int stmtSetTbNameTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { + STscStmt* pStmt = (STscStmt*)stmt; + + STMT_CHK_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_STATUS_ERROR); + + if (tbName) { + pStmt->tbName = strdup(tbName); } - if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { - free(pSql); - free(pStmt); - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscError("failed to malloc payload buffer"); - return NULL; + if (tags) { + pStmt->bindTags = tags; } - tsem_init(&pSql->rspSem, 0, 0); - pSql->signature = pSql; - pSql->pTscObj = pObj; - pSql->maxRetry = TSDB_MAX_REPLICA; - pStmt->pSql = pSql; - pStmt->last = STMT_INIT; - pStmt->numOfRows = 0; - registerSqlObj(pSql); -#endif + return TSDB_CODE_SUCCESS; +} - return pStmt; +int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { + STscStmt* pStmt = (STscStmt*)stmt; + + STMT_CHK_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_STATUS_ERROR); + + if (NULL == pStmt->pRequest) { + SStmtCallback stmtCb = {.pStmt = stmt, .getTbNameFn = stmtGetTbName}; + + STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql, pStmt->sqlLen, &pStmt->pRequest)); + STMT_ERR_RET(parseSql(pStmt->pRequest, false, &pStmt->pQuery, &stmtCb)); + } + + qBindStmtData(pStmt->pQuery, bind, pStmt->pRequest->msgBuf, pStmt->pRequest->msgBufLen); + + return TSDB_CODE_SUCCESS; } -int stmtClose(TAOS_STMT *stmt) { + +int stmtAddBatch(TAOS_STMT *stmt) { + STscStmt* pStmt = (STscStmt*)stmt; + + STMT_CHK_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_STATUS_ERROR); + + qBuildStmtOutput(pStmt->pQuery); + return TSDB_CODE_SUCCESS; } @@ -55,23 +93,22 @@ int stmtExec(TAOS_STMT *stmt) { return TSDB_CODE_SUCCESS; } -char *stmtErrstr(TAOS_STMT *stmt) { - return NULL; -} -int stmtAffectedRows(TAOS_STMT *stmt) { +int stmtClose(TAOS_STMT *stmt) { return TSDB_CODE_SUCCESS; } -int stmtBind(TAOS_STMT *stmt, TAOS_BIND *bind) { - return TSDB_CODE_SUCCESS; -} +char *stmtErrstr(TAOS_STMT *stmt) { + STscStmt* pStmt = (STscStmt*)stmt; -int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { - return TSDB_CODE_SUCCESS; + if (stmt == NULL) { + return (char*) tstrerror(terrno); + } + + return taos_errstr(pStmt->pRequest); } -int stmtSetTbNameTags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) { +int stmtAffectedRows(TAOS_STMT *stmt) { return TSDB_CODE_SUCCESS; } @@ -83,17 +120,9 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) { return TSDB_CODE_SUCCESS; } -int stmtAddBatch(TAOS_STMT *stmt) { - return TSDB_CODE_SUCCESS; -} - TAOS_RES *stmtUseResult(TAOS_STMT *stmt) { return NULL; } -int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { - return TSDB_CODE_SUCCESS; -} - diff --git a/source/libs/parser/inc/parInsertData.h b/source/libs/parser/inc/parInsertData.h index ee17de50e06ba54dafb731e389937ad3e96ecef9..b3440eff70a9577226ef8d5a9149aaa622809df8 100644 --- a/source/libs/parser/inc/parInsertData.h +++ b/source/libs/parser/inc/parInsertData.h @@ -89,7 +89,7 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) { (int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1); } -static FORCE_INLINE void getSTSRowAppendInfo(SSchema *pSchema, uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx, +static FORCE_INLINE void getSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx, int32_t *toffset, col_id_t *colIdx) { col_id_t schemaIdx = 0; if (IS_DATA_COL_ORDERED(spd)) { diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 9a0483cd3323411e3d4347ca771d4cf064b87be6..9be821d3210180fb2f033e9cd3c932e36a4eedb4 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -63,6 +63,7 @@ typedef struct SInsertParseContext { SArray* pVgDataBlocks; // global int32_t totalNum; SVnodeModifOpStmt* pOutput; + SStmtCallback* pStmtCb; } SInsertParseContext; typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param); @@ -453,8 +454,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int if (isNullStr(pToken)) { if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { - int64_t tmpVal = 0; - return func(pMsgBuf, &tmpVal, pSchema->bytes, param); + return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z); } return func(pMsgBuf, NULL, 0, param); @@ -872,8 +872,22 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, for (int i = 0; i < spd->numOfBound; ++i) { NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken); SSchema* pSchema = &schema[spd->boundColumns[i] - 1]; + + if (sToken.type == TK_NK_QUESTION) { + isParseBindParam = true; + if (NULL == pCxt->pStmtCb) { + return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z); + } + + continue; + } + + if (isParseBindParam) { + return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values"); + } + param.schema = pSchema; - getSTSRowAppendInfo(schema, pBuilder->rowType, spd, i, ¶m.toffset, ¶m.colIdx); + getSTSRowAppendInfo(pBuilder->rowType, spd, i, ¶m.toffset, ¶m.colIdx); CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, ¶m, &pCxt->msg)); if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { @@ -1005,11 +1019,15 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // [...]; static int32_t parseInsertBody(SInsertParseContext* pCxt) { + int32_t tbNum = 0; + // for each table while (1) { destroyInsertParseContextForTable(pCxt); SToken sToken; + char *tbName = NULL; + // pSql -> tb_name ... NEXT_TOKEN(pCxt->pSql, sToken); @@ -1021,6 +1039,21 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { break; } + if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && tbNum > 0) { + return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");; + } + + if (TK_NK_QUESTION == sToken.type) { + if (pCxt->pStmtCb) { + CHECK_CODE((*pCxt->pStmtCb->getTbNameFn)(pCxt->pStmtCb->pStmt, &tbName)); + + sToken.z = tbName; + sToken.n = strlen(tbName); + } else { + return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z); + } + } + SToken tbnameToken = sToken; NEXT_TOKEN(pCxt->pSql, sToken); @@ -1046,6 +1079,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { // pSql -> (field1_value, ...) [(field1_value2, ...) ...] CHECK_CODE(parseValuesClause(pCxt, dataBuf)); pCxt->pOutput->insertType = TSDB_QUERY_TYPE_INSERT; + + tbNum++; continue; } @@ -1058,13 +1093,31 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { } // todo pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT; + + tbNum++; continue; } return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z); } + + if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { + pCxt->pOutput->stmtCtx.tbUid = pCxt->pTableMeta->uid; + pCxt->pOutput->stmtCtx.pVgroupsHashObj = pCxt->pVgroupsHashObj; + pCxt->pOutput->stmtCtx.pTableBlockHashObj = pCxt->pTableBlockHashObj; + pCxt->pOutput->stmtCtx.pSubTableHashObj = pCxt->pSubTableHashObj; + pCxt->pOutput->stmtCtx.pTableDataBlocks = pCxt->pTableDataBlocks; + + pCxt->pVgroupsHashObj = NULL; + pCxt->pTableBlockHashObj = NULL; + pCxt->pSubTableHashObj = NULL; + pCxt->pTableDataBlocks = NULL; + + return TSDB_CODE_SUCCESS; + } + // merge according to vgId - if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { + if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); } return buildOutput(pCxt); @@ -1086,7 +1139,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false), .totalNum = 0, - .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT) + .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT), + .pStmtCb = pContext->pStmtCb }; if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || @@ -1094,6 +1148,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + if (pContext->pStmtCb) { + TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT); + } + *pQuery = taosMemoryCalloc(1, sizeof(SQuery)); if (NULL == *pQuery) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index 088b25d5448665de9d84ae94c023ee8fef41e711..7d9adbd16ca680ee86c2c1054c80068a760bc877 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -506,6 +506,28 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p return TSDB_CODE_SUCCESS; } +int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize) { + size_t remain = pDataBlock->nAllocSize - pDataBlock->size; + uint32_t nAllocSizeOld = pDataBlock->nAllocSize; + + // expand the allocated size + if (remain < allSize) { + pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5; + + char *tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize); + if (tmp != NULL) { + pDataBlock->pData = tmp; + memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size); + } else { + // do nothing, if allocate more memory failed + pDataBlock->nAllocSize = nAllocSizeOld; + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} + int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) { size_t remain = pDataBlock->nAllocSize - pDataBlock->size; const int factor = 5; diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index ebe76cc1293f16783517c51e3e2b509be4849ca8..57f2d3801cfff80d08ace0d3257d137c9e9e8330 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -51,6 +51,98 @@ int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery) { return code; } +int32_t qBindStmtData(SQuery* pQuery, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { + SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; + SStmtDataCtx *pCtx = &modifyNode->stmtCtx; + STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid)); + if (NULL == pDataBlock) { + return TSDB_CODE_QRY_APP_ERROR; + } + + SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); + int32_t extendedRowSize = getExtendedRowSize(pDataBlock); + SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; + SRowBuilder* pBuilder = &pDataBlock->rowBuilder; + SMemParam param = {.rb = pBuilder}; + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + + CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num); + + for (int32_t r = 0; r < bind->num; ++r) { + STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header + tdSRowResetBuf(pBuilder, row); + + // 1. set the parsed value from sql string + for (int c = 0; c < spd->numOfBound; ++c) { + SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1]; + + param.schema = pColSchema; + getSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); + + if (bind[c].is_null && bind[c].is_null[r]) { + CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, ¶m)); + } else { + int32_t colLen = pColSchema->bytes; + if (IS_VAR_DATA_TYPE(pColSchema->type)) { + colLen = bind[c].length[r]; + } + + CHECK_CODE(MemRowAppend(&pBuf, (char *)bind[c].buffer + bind[c].buffer_length * r, colLen, ¶m)); + } + + if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { + TSKEY tsKey = TD_ROW_KEY(row); + checkTimestamp(pDataBlock, (const char *)&tsKey); + } + } + + // set the null value for the columns that do not assign values + if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { + for (int32_t i = 0; i < spd->numOfCols; ++i) { + if (spd->cols[i].valStat == VAL_STAT_NONE) { // the primary TS key is not VAL_STAT_NONE + tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i, + spd->cols[i].toffset); + } + } + } + + pDataBlock->size += extendedRowSize; + } + + SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData); + if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) { + return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767"); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t qBuildStmtOutput(SQuery* pQuery) { + SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; + SStmtDataCtx *pCtx = &modifyNode->stmtCtx; + int32_t code = 0; + SInsertParseContext insertCtx = { + .pVgroupsHashObj = pCtx->pVgroupsHashObj; + .pTableBlockHashObj = pCtx->pTableBlockHashObj; + .pSubTableHashObj = pCtx->pSubTableHashObj; + .pTableDataBlocks = pCtx->pTableDataBlocks; + }; + + // merge according to vgId + if (taosHashGetSize(pCtx->pTableBlockHashObj) > 0) { + CHECK_CODE_GOTO(mergeTableDataBlocks(pCtx->pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks), _return); + } + + code = buildOutput(&insertCtx); + +_return: + + destroyInsertParseContext(&insertCtx); + + return code; +} + + void qDestroyQuery(SQuery* pQueryNode) { if (NULL == pQueryNode) { return; @@ -68,4 +160,4 @@ void qDestroyQuery(SQuery* pQueryNode) { int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) { return extractResultSchema(pRoot, numOfCols, pSchema); -} \ No newline at end of file +} diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ded42365b6e6d188b23cb13751ce3eca68798ccd..6fe272de8af35e787be3a6c519d1197e125ca69f 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -138,6 +138,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_STATUS_ERROR, "Stmt API usage error") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set") // mnode-common TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error")