From e3ea173046f5620ad3e8b4e89327c96474ac4f0e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 15 Apr 2022 20:10:51 +0800 Subject: [PATCH] stmt --- include/libs/nodes/querynodes.h | 10 +- include/util/taoserror.h | 1 + source/client/inc/clientStmt.h | 14 ++ source/client/src/clientImpl.c | 24 +-- source/client/src/clientMain.c | 15 +- source/client/src/clientStmt.c | 235 +++++++++++++++++++++++++++-- source/libs/parser/inc/parInt.h | 1 + source/libs/parser/src/parInsert.c | 38 ++--- source/libs/parser/src/parser.c | 93 +++++++++++- source/util/src/terror.c | 1 + 10 files changed, 382 insertions(+), 50 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 5e6c5233f7..b07568b0a0 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -282,10 +282,12 @@ typedef struct SVgDataBlocks { typedef struct SStmtDataCtx { uint64_t tbUid; - SHashObj* pVgroupsHashObj; // global - SHashObj* pTableBlockHashObj; // global - SHashObj* pSubTableHashObj; // global - SArray* pTableDataBlocks; // global + uint64_t tbSuid; + int8_t tbType; + SParsedDataColInfo tags; + + SHashObj* pVgroupsHashObj; + SHashObj* pTableBlockHashObj; } SStmtDataCtx; typedef struct SVnodeModifOpStmt { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 428dca96f8..ffd32a82d4 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -134,6 +134,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) #define TSDB_CODE_TSC_STMT_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225) #define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226) +#define TSDB_CODE_TSC_STMT_CLAUSE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0227) // mnode-common #define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300) diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index 3ff7c0006b..a8460a664c 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -30,22 +30,36 @@ typedef enum { STMT_INIT = 1, STMT_PREPARE, STMT_SETTBNAME, + STMT_FETCH_TAG_FIELDS, + STMT_FETCH_COL_FIELDS, STMT_BIND, STMT_BIND_COL, STMT_ADD_BATCH, STMT_EXECUTE } STMT_STATUS; + + typedef struct STscStmt { STMT_TYPE type; STMT_STATUS status; + bool autoCreate; + uint64_t runTimes; STscObj* taos; + SCatalog* pCatalog; + SHashObj* pTableDataBlocks; + SHashObj* pVgList; + + bool tbNeedParse; + bool tbReuse; SRequestObj* pRequest; SQuery* pQuery; char* sql; int32_t sqlLen; char* tbName; + SName sname; TAOS_BIND* bindTags; + //SMultiTbStmt mtb; //SNormalStmt normal; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 37597ba8e9..cb8d4ce42a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -283,16 +283,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList return pRequest->code; } -SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { - SRequestObj* pRequest = NULL; - SQuery* pQuery = NULL; +SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code) { SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); - int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); - if (TSDB_CODE_SUCCESS == code) { - code = parseSql(pRequest, false, &pQuery, NULL); - } - if (TSDB_CODE_SUCCESS == code) { switch (pQuery->execMode) { case QUERY_EXEC_MODE_LOCAL: @@ -324,6 +317,19 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { return pRequest; } + +SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) { + SRequestObj* pRequest = NULL; + SQuery* pQuery = NULL; + + int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); + if (TSDB_CODE_SUCCESS == code) { + code = parseSql(pRequest, false, &pQuery, NULL); + } + + return launchQueryImpl(pRequest, pQuery, code); +} + int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { SCatalog* pCatalog = NULL; int32_t code = 0; @@ -368,7 +374,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { int32_t code = 0; while (retryNum++ < REQUEST_MAX_TRY_TIMES) { - pRequest = execQueryImpl(pTscObj, sql, sqlLen); + pRequest = launchQuery(pTscObj, sql, sqlLen); if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { break; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 56132c3041..914f58c2e0 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -580,13 +580,22 @@ int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { } int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) { - if (stmt == NULL || name == NULL || tags == NULL) { + if (stmt == NULL || name == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; return terrno; } - return stmtSetTbNameTags(stmt, name, tags); + int32_t code = stmtSetTbName(stmt, name); + if (code) { + return code; + } + + if (tags) { + return stmtSetTbTags(stmt, tags); + } + + return TSDB_CODE_SUCCESS; } int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { @@ -596,7 +605,7 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { return terrno; } - return stmtSetTbNameTags(stmt, name, NULL); + return stmtSetTbName(stmt, name); } int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) { diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index a43ffdf31e..2549e52099 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -7,6 +7,8 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) { STscStmt* pStmt = (STscStmt*)stmt; + pStmt->type = STMT_TYPE_MULTI_INSERT; + if (NULL == pStmt->tbName) { tscError("no table name set"); STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR); @@ -17,18 +19,151 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) { return TSDB_CODE_SUCCESS; } +int32_t stmtParseSql(STscStmt* pStmt) { + SStmtCallback stmtCb = {.pStmt = pStmt, .getTbNameFn = stmtGetTbName}; + + STMT_ERR_RET(parseSql(pStmt->pRequest, false, &pStmt->pQuery, &stmtCb)); + + pStmt->tbNeedParse = false; + + switch (nodeType(pStmt->pQuery->pRoot)) { + case QUERY_NODE_VNODE_MODIF_STMT: + if (0 == pStmt->type) { + pStmt->type = STMT_TYPE_INSERT; + } + break; + case QUERY_NODE_SELECT_STMT: + pStmt->type = STMT_TYPE_QUERY; + break; + default: + tscError("not supported stmt type %d", nodeType(pStmt->pQuery->pRoot)); + STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtCloneBlock(STableDataBlocks** pDst, STableDataBlocks* pSrc) { + *pDst = (STableDataBlocks*)taosMemoryMalloc(sizeof(STableDataBlocks)); + if (NULL == *pDst) { + STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + memcpy(*pDst, pSrc, sizeof(STableDataBlocks)); + (*pDst)->cloned = true; + + (*pDst)->pData = NULL; + (*pDst)->ordered = true; + (*pDst)->prevTS = INT64_MIN; + (*pDst)->size = sizeof(SSubmitBlk); + (*pDst)->tsSource = -1; + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtSaveTableDataBlock(STscStmt *pStmt) { + if (pStmt->type != STMT_TYPE_MULTI_INSERT) { + return TSDB_CODE_SUCCESS; + } + + SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->pQuery->pRoot; + SStmtDataCtx *pCtx = &modifyNode->stmtCtx; + + uint64_t uid; + if (TSDB_CHILD_TABLE == pCtx->tbType) { + uid = pCtx->tbSuid; + } else { + ASSERT(TSDB_NORMAL_TABLE == pCtx->tbType); + uid = pCtx->tbUid; + } + + if (taosHashGet(pStmt->pTableDataBlocks, &uid, sizeof(uid))) { + return TSDB_CODE_SUCCESS; + } + + ASSERT(1 == taosHashGetSize(pStmt->pTableDataBlocks)); + + STableDataBlocks** pSrc = taosHashIterate(pStmt->pTableDataBlocks, NULL); + STableDataBlocks* pDst = NULL; + + STMT_ERR_RET(stmtCloneBlock(&pDst, *pSrc)); + + taosHashPut(pStmt->pTableDataBlocks, &uid, sizeof(uid), &pDst, POINTER_BYTES); + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtHandleTbInCache(STscStmt* pStmt) { + if (NULL == pStmt->pTableDataBlocks || taosHashGetSize(pStmt->pTableDataBlocks) <= 0) { + return TSDB_CODE_SUCCESS; + } + + if (NULL == pStmt->pCatalog) { + STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog)); + } + + STableMeta *pTableMeta = NULL; + SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); + STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->sname, &pTableMeta)); + + SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->pQuery->pRoot; + SStmtDataCtx *pCtx = &modifyNode->stmtCtx; + + if (pTableMeta->uid == pCtx->tbUid) { + pStmt->tbNeedParse = false; + pStmt->tbReuse = false; + return TSDB_CODE_SUCCESS; + } + + if (taosHashGet(pCtx->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid))) { + pStmt->tbNeedParse = false; + pStmt->tbReuse = true; + pCtx->tbUid = pTableMeta->uid; + pCtx->tbSuid = pTableMeta->suid; + pCtx->tbType = pTableMeta->tableType; + + return TSDB_CODE_SUCCESS; + } + + STableDataBlocks** pDataBlock = taosHashGet(pStmt->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid)) + if (pDataBlock && *pDataBlock) { + pStmt->tbNeedParse = false; + pStmt->tbReuse = true; + + pCtx->tbUid = pTableMeta->uid; + pCtx->tbSuid = pTableMeta->suid; + pCtx->tbType = pTableMeta->tableType; + + taosHashPut(pCtx->pTableBlockHashObj, &pCtx->tbUid, sizeof(pCtx->tbUid), pDataBlock, POINTER_BYTES); + + return TSDB_CODE_SUCCESS; + } + + return TSDB_CODE_SUCCESS; +} + + TAOS_STMT *stmtInit(TAOS *taos) { STscObj* pObj = (STscObj*)taos; STscStmt* pStmt = NULL; pStmt = taosMemoryCalloc(1, sizeof(STscStmt)); - if (pStmt == NULL) { + if (NULL == pStmt) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscError("failed to allocate memory for statement"); return NULL; } + + pStmt->pTableDataBlocks = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + pStmt->pVgList = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == pStmt->pTableDataBlocks || NULL == pStmt->pVgList) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + taosMemoryFree(pStmt); + return NULL; + } + pStmt->taos = pObj; pStmt->status = STMT_INIT; + pStmt->tbNeedParse = true; return pStmt; } @@ -45,34 +180,89 @@ int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { } -int stmtSetTbNameTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { +int stmtSetTbName(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { STscStmt* pStmt = (STscStmt*)stmt; STMT_CHK_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_STATUS_ERROR); - if (tbName) { - pStmt->tbName = strdup(tbName); + taosMemoryFree(pStmt->tbName); + + if (NULL == pStmt->pRequest) { + STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql, pStmt->sqlLen, &pStmt->pRequest)); } + + STMT_ERR_RET(qCreateSName(&pStmt->sname, tbName, pStmt->taos->acctId, pStmt->pRequest->pDb, pStmt->pRequest->msgBuf, pStmt->pRequest->msgBufLen)); + + pStmt->tbName = strdup(tbName); + + STMT_ERR_RET(stmtHandleTbInCache(pStmt)); + + return TSDB_CODE_SUCCESS; +} + +int stmtSetTbTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { + STscStmt* pStmt = (STscStmt*)stmt; - if (tags) { + STMT_CHK_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_STATUS_ERROR); + + if (pStmt->tbNeedParse) { + taosMemoryFree(pStmt->bindTags); pStmt->bindTags = tags; + + STMT_ERR_RET(stmtParseSql(pStmt)); + } else { + //TODO BIND TAG DATA + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) { + STscStmt* pStmt = (STscStmt*)stmt; + + STMT_CHK_STATUS(stmt, STMT_FETCH_TAG_FIELDS, TSDB_CODE_TSC_STMT_STATUS_ERROR); + + if (pStmt->tbNeedParse) { + STMT_ERR_RET(stmtParseSql(pStmt)); } + STMT_ERR_RET(qBuildStmtTagFields(pStmt->pQuery, fieldNum, fields)); + return TSDB_CODE_SUCCESS; } +int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD* fields) { + STscStmt* pStmt = (STscStmt*)stmt; + + STMT_CHK_STATUS(stmt, STMT_FETCH_COL_FIELDS, TSDB_CODE_TSC_STMT_STATUS_ERROR); + + if (pStmt->tbNeedParse) { + STMT_ERR_RET(stmtParseSql(pStmt)); + } + + STMT_ERR_RET(qBuildStmtColFields(pStmt->pQuery, fieldNum, fields)); + + return TSDB_CODE_SUCCESS; +} + int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { STscStmt* pStmt = (STscStmt*)stmt; STMT_CHK_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_STATUS_ERROR); - if (NULL == pStmt->pRequest) { - SStmtCallback stmtCb = {.pStmt = stmt, .getTbNameFn = stmtGetTbName}; + if (pStmt->tbNeedParse && pStmt->runTimes && pStmt->type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->type) { + pStmt->tbNeedParse = false; + } + if (NULL == pStmt->pRequest) { STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql, pStmt->sqlLen, &pStmt->pRequest)); - STMT_ERR_RET(parseSql(pStmt->pRequest, false, &pStmt->pQuery, &stmtCb)); } + if (pStmt->tbNeedParse) { + STMT_ERR_RET(stmtParseSql(pStmt)); + } + qBindStmtData(pStmt->pQuery, bind, pStmt->pRequest->msgBuf, pStmt->pRequest->msgBufLen); return TSDB_CODE_SUCCESS; @@ -82,15 +272,36 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { int stmtAddBatch(TAOS_STMT *stmt) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_CHK_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_STATUS_ERROR); + STMT_CHK_STATUS(stmt, STMT_ADD_BATCH, TSDB_CODE_TSC_STMT_STATUS_ERROR); - qBuildStmtOutput(pStmt->pQuery); + STMT_ERR_RET(stmtSaveTableDataBlock(pStmt)); return TSDB_CODE_SUCCESS; } int stmtExec(TAOS_STMT *stmt) { - return TSDB_CODE_SUCCESS; + STscStmt* pStmt = (STscStmt*)stmt; + int32_t code = 0; + + STMT_CHK_STATUS(stmt, STMT_EXECUTE, TSDB_CODE_TSC_STMT_STATUS_ERROR); + + STMT_ERR_RET(qBuildStmtOutput(pStmt->pQuery)); + + launchQueryImpl(pStmt->pRequest, pStmt->pQuery, TSDB_CODE_SUCCESS); + + STMT_ERR_JRET(pStmt->pRequest->code); + +_return: + + //TODO RESET AND CLEAN PART TO DATABLOCK... + + taos_free_result(pStmt->pRequest); + pStmt->pRequest = NULL; + + pStmt->tbNeedParse = true; + ++pStmt->runTimes; + + STMT_RET(code); } diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index ea01caf9d7..8f88a05f77 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -27,6 +27,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery); int32_t translate(SParseContext* pParseCxt, SQuery* pQuery); int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); +int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, char* dbName, SMsgBuf* pMsgBuf); #ifdef __cplusplus } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 9be821d321..d069208b23 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -59,7 +59,6 @@ typedef struct SInsertParseContext { SHashObj* pVgroupsHashObj; // global SHashObj* pTableBlockHashObj; // global SHashObj* pSubTableHashObj; // global - SArray* pTableDataBlocks; // global SArray* pVgDataBlocks; // global int32_t totalNum; SVnodeModifOpStmt* pOutput; @@ -164,7 +163,7 @@ static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullD return TSDB_CODE_SUCCESS; } -static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) { +int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, char* dbName, SMsgBuf* pMsgBuf) { const char* msg1 = "name too long"; const char* msg2 = "invalid database name"; const char* msg3 = "db is not specified"; @@ -180,7 +179,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar strncpy(name, pTableName->z, dbLen); dbLen = strdequote(name); - code = tNameSetDbName(pName, pParseCtx->acctId, name, dbLen); + code = tNameSetDbName(pName, acctId, name, dbLen); if (code != TSDB_CODE_SUCCESS) { return buildInvalidOperationMsg(pMsgBuf, msg1); } @@ -205,11 +204,11 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar strncpy(name, pTableName->z, pTableName->n); strdequote(name); - if (pParseCtx->db == NULL) { + if (dbName == NULL) { return buildInvalidOperationMsg(pMsgBuf, msg3); } - code = tNameSetDbName(pName, pParseCtx->acctId, pParseCtx->db, strlen(pParseCtx->db)); + code = tNameSetDbName(pName, acctId, dbName, strlen(dbName)); if (code != TSDB_CODE_SUCCESS) { code = buildInvalidOperationMsg(pMsgBuf, msg2); return code; @@ -227,7 +226,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool isStb) { SParseContext* pBasicCtx = pCxt->pComCxt; SName name = {0}; - createSName(&name, pTname, pBasicCtx, &pCxt->msg); + createSName(&name, pTname, pBasicCtx->acctId, pBasicCtx->db, &pCxt->msg); if (isStb) { CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta)); } else { @@ -812,7 +811,7 @@ static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, S // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) { SName name; - createSName(&name, pTbnameToken, pCxt->pComCxt, &pCxt->msg); + createSName(&name, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&name, tbFName); int32_t len = strlen(tbFName); @@ -1009,7 +1008,6 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { taosHashCleanup(pCxt->pVgroupsHashObj); destroyBlockHashmap(pCxt->pTableBlockHashObj); - destroyBlockArrayList(pCxt->pTableDataBlocks); destroyBlockArrayList(pCxt->pVgDataBlocks); } @@ -1103,15 +1101,16 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { pCxt->pOutput->stmtCtx.tbUid = pCxt->pTableMeta->uid; + pCxt->pOutput->stmtCtx.tbSuid = pCxt->pTableMeta->suid; + pCxt->pOutput->stmtCtx.tbType = pCxt->pTableMeta->tableType; + pCxt->pOutput->stmtCtx.pVgroupsHashObj = pCxt->pVgroupsHashObj; pCxt->pOutput->stmtCtx.pTableBlockHashObj = pCxt->pTableBlockHashObj; - pCxt->pOutput->stmtCtx.pSubTableHashObj = pCxt->pSubTableHashObj; - pCxt->pOutput->stmtCtx.pTableDataBlocks = pCxt->pTableDataBlocks; + pCxt->pOutput->stmtCtx.tags = pCxt->tags; pCxt->pVgroupsHashObj = NULL; pCxt->pTableBlockHashObj = NULL; - pCxt->pSubTableHashObj = NULL; - pCxt->pTableDataBlocks = NULL; + memset(&pCxt->tags, 0, sizeof(pCxt->tags)); return TSDB_CODE_SUCCESS; } @@ -1152,14 +1151,17 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT); } - *pQuery = taosMemoryCalloc(1, sizeof(SQuery)); if (NULL == *pQuery) { - return TSDB_CODE_OUT_OF_MEMORY; + *pQuery = taosMemoryCalloc(1, sizeof(SQuery)); + if (NULL == *pQuery) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE; + (*pQuery)->haveResultSet = false; + (*pQuery)->msgType = TDMT_VND_SUBMIT; + (*pQuery)->pRoot = (SNode*)context.pOutput; } - (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE; - (*pQuery)->haveResultSet = false; - (*pQuery)->msgType = TDMT_VND_SUBMIT; - (*pQuery)->pRoot = (SNode*)context.pOutput; + context.pOutput->payloadType = PAYLOAD_TYPE_KV; int32_t code = skipInsertInto(&context); diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 57f2d3801c..edd797f35b 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -51,6 +51,33 @@ int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery) { return code; } +int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen) { + SMsgBuf msg = {.buf = msgBuf, .len =msgBufLen}; + SToken sToken; + int32_t code = 0; + char *tbName = NULL; + + NEXT_TOKEN(pTableName, sToken); + + if (sToken.n == 0) { + return buildInvalidOperationMsg(&msg, "empty table name"); + } + + code = createSName(pName, &sToken, acctId, dbName, &msg); + if (code) { + return code; + } + + NEXT_TOKEN(pTableName, sToken); + + if (SToken.n > 0) { + return buildInvalidOperationMsg(&msg, "table name format is wrong"); + } + + return TSDB_CODE_SUCCESS; +} + + int32_t qBindStmtData(SQuery* pQuery, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; SStmtDataCtx *pCtx = &modifyNode->stmtCtx; @@ -122,10 +149,8 @@ int32_t qBuildStmtOutput(SQuery* pQuery) { SStmtDataCtx *pCtx = &modifyNode->stmtCtx; int32_t code = 0; SInsertParseContext insertCtx = { - .pVgroupsHashObj = pCtx->pVgroupsHashObj; - .pTableBlockHashObj = pCtx->pTableBlockHashObj; - .pSubTableHashObj = pCtx->pSubTableHashObj; - .pTableDataBlocks = pCtx->pTableDataBlocks; + .pVgroupsHashObj = pCtx->pVgroupsHashObj, + .pTableBlockHashObj = pCtx->pTableBlockHashObj, }; // merge according to vgId @@ -142,6 +167,66 @@ _return: return code; } +int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_t *fieldNum, TAOS_FIELD** fields) { + *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD)); + if (NULL == *fields) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < boundInfo->numOfBound; ++i) { + SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1]; + strcpy((*fields)[i].name, pTagSchema->name); + (*fields)[i].type = pTagSchema->type; + (*fields)[i].bytes = pTagSchema->bytes; + } + + *fieldNum = boundInfo->numOfBound; + + return TSDB_CODE_SUCCESS; +} + + +int32_t qBuildStmtTagFields(SQuery* pQuery, int32_t *fieldNum, TAOS_FIELD** fields) { + SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; + SStmtDataCtx *pCtx = &modifyNode->stmtCtx; + STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid)); + if (NULL == pDataBlock) { + return TSDB_CODE_QRY_APP_ERROR; + } + + SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); + if (pCtx->tags.numOfBound <= 0) { + *fieldNum = 0; + *fields = NULL; + + return TSDB_CODE_SUCCESS; + } + + CHECK_CODE(buildBoundFields(&pCtx->tags, pSchema, fieldNum, fields)); + + return TSDB_CODE_SUCCESS; +} + +int32_t qBuildStmtColFields(SQuery* pQuery, int32_t *fieldNum, TAOS_FIELD** fields) { + SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; + SStmtDataCtx *pCtx = &modifyNode->stmtCtx; + STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid)); + if (NULL == pDataBlock) { + return TSDB_CODE_QRY_APP_ERROR; + } + + SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); + if (pCtx->tags.numOfBound <= 0) { + *fieldNum = 0; + *fields = NULL; + + return TSDB_CODE_SUCCESS; + } + + CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields)); + + return TSDB_CODE_SUCCESS; +} void qDestroyQuery(SQuery* pQueryNode) { if (NULL == pQueryNode) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6fe272de8a..139b0d30f5 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -140,6 +140,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_STATUS_ERROR, "Stmt API usage error") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause") // mnode-common TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error") -- GitLab