diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index b07568b0a0763a11ed656ed4ac7faaa71e7b5635..3407e597ff8a8df07284b1b05bbca3a36e952cd7 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -280,16 +280,6 @@ typedef struct SVgDataBlocks { char *pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ... } SVgDataBlocks; -typedef struct SStmtDataCtx { - uint64_t tbUid; - uint64_t tbSuid; - int8_t tbType; - SParsedDataColInfo tags; - - SHashObj* pVgroupsHashObj; - SHashObj* pTableBlockHashObj; -} SStmtDataCtx; - typedef struct SVnodeModifOpStmt { ENodeType nodeType; ENodeType sqlNodeType; @@ -297,7 +287,6 @@ typedef struct SVnodeModifOpStmt { uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint32_t insertType; // insert data from [file|sql statement| bound statement] const char* sql; // current sql statement position - SStmtDataCtx stmtCtx; } SVnodeModifOpStmt; typedef struct SExplainOptions { diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index eea9da4e520978ea1265a5a50ecd20b56d5788c8..f0fa9776088eaec3e29c1db602bc082b277d221d 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -24,7 +24,10 @@ extern "C" { typedef struct SStmtCallback { TAOS_STMT* pStmt; - int32_t (*getTbNameFn)(TAOS_STMT*, char**); + int32_t (*getTbNameFn)(TAOS_STMT*, char**); + int32_t (*setBindInfoFn)(TAOS_STMT*, STableMeta*, void*); + int32_t (*setExecInfoFn)(TAOS_STMT*, SHashObj*, SHashObj*); + int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**); } SStmtCallback; typedef struct SParseContext { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ffd32a82d493c455ffe2823328c8ebfe6491f00b..b97279cca2986ef6cec0954e0ea94f65e355c2ba 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -132,7 +132,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222) #define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) #define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) -#define TSDB_CODE_TSC_STMT_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225) +#define TSDB_CODE_TSC_STMT_API_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225) #define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226) #define TSDB_CODE_TSC_STMT_CLAUSE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0227) diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index a8460a664c854f60db0cfa3cc32b69a88630c89d..7fb23840fe7c84e07918ad09eea719f56ba23445 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -38,27 +38,46 @@ typedef enum { STMT_EXECUTE } STMT_STATUS; +typedef struct SStmtTableCache { + STableDataBlocks* pDataBlock; + void* boundTags; +} SStmtTableCache; + +typedef struct SStmtBindInfo { + bool needParse; + uint64_t tbUid; + uint64_t tbSuid; + int8_t tbType; + void* boundTags; + char* tbName; + SName sname; + TAOS_BIND* bindTags; +} SStmtBindInfo; +typedef struct SStmtExecInfo { + SRequestObj* pRequest; + SHashObj* pVgHash; + SHashObj* pBlockHash; +} SStmtExecInfo; -typedef struct STscStmt { +typedef struct SStmtSQLInfo { STMT_TYPE type; STMT_STATUS status; bool autoCreate; uint64_t runTimes; - STscObj* taos; - SCatalog* pCatalog; - SHashObj* pTableDataBlocks; - SHashObj* pVgList; - - bool tbNeedParse; - bool tbReuse; - SRequestObj* pRequest; + SHashObj* pTableCache; //SHash SQuery* pQuery; - char* sql; + char* sqlStr; int32_t sqlLen; - char* tbName; - SName sname; - TAOS_BIND* bindTags; +} SStmtSQLInfo; + +typedef struct STscStmt { + STscObj* taos; + SCatalog* pCatalog; + + SStmtSQLInfo sql; + SStmtExecInfo exec; + SStmtBindInfo bind; //SMultiTbStmt mtb; //SNormalStmt normal; @@ -71,16 +90,19 @@ typedef struct STscStmt { #define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) -#define STMT_CHK_STATUS(_stmt, _status, _v) do { - switch (_status) { +#define STMT_SWITCH_STATUS(_stmt, _newstatus, _errcode) do { + switch (_newstatus) { case STMT_INIT: - if ((_stmt)->status != 0) return (_v); + if ((_stmt)->status != 0) return (_errcode); break; case STMT_PREPARE: - if ((_stmt)->status != STMT_INIT) STMT_ERR_RET(_v); + if ((_stmt)->status != STMT_INIT) STMT_ERR_RET(_errcode); break; case STMT_SETTBNAME: break; + default: + STMT_ERR_RET(_errcode); + break; } } while (0) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index cb8d4ce42aad6b71fd119601db146525cd02e83f..93e79dfebb6a036c09e56e9a61faa90f0b8a58a3 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -283,7 +283,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList return pRequest->code; } -SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code) { +SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) { SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); if (TSDB_CODE_SUCCESS == code) { @@ -309,7 +309,10 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code } taosArrayDestroy(pNodeList); - qDestroyQuery(pQuery); + if (!keepQuery) { + qDestroyQuery(pQuery); + } + if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { pRequest->code = terrno; } @@ -327,7 +330,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) { code = parseSql(pRequest, false, &pQuery, NULL); } - return launchQueryImpl(pRequest, pQuery, code); + return launchQueryImpl(pRequest, pQuery, code, false); } int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 2549e52099e3b25e0cf240004d3bffa463af0136..f2797adcc1499eda8a2af31fb145ee6979cf227a 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -7,43 +7,92 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) { STscStmt* pStmt = (STscStmt*)stmt; - pStmt->type = STMT_TYPE_MULTI_INSERT; + pStmt->sql.type = STMT_TYPE_MULTI_INSERT; - if (NULL == pStmt->tbName) { + if (NULL == pStmt->bind.tbName) { tscError("no table name set"); STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR); } - *tbName = pStmt->tbName; + *tbName = pStmt->bind.tbName; + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) { + STscStmt* pStmt = (STscStmt*)stmt; + + pStmt->bind.tbUid = pTableMeta->uid; + pStmt->bind.tbSuid = pTableMeta->suid; + pStmt->bind.tbType = pTableMeta->tableType; + pStmt->bind.boundTags = tags; + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtSetExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) { + STscStmt* pStmt = (STscStmt*)stmt; + + pStmt->exec.pVgHash = pVgHash; + pStmt->exec.pBlockHash = pBlockHash; + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) { + STscStmt* pStmt = (STscStmt*)stmt; + + *pVgHash = pStmt->exec.pVgHash; + *pBlockHash = pStmt->exec.pBlockHash; return TSDB_CODE_SUCCESS; } int32_t stmtParseSql(STscStmt* pStmt) { - SStmtCallback stmtCb = {.pStmt = pStmt, .getTbNameFn = stmtGetTbName}; + SStmtCallback stmtCb = { + .pStmt = pStmt, + .getTbNameFn = stmtGetTbName, + .setBindInfoFn = stmtSetBindInfo, + .setExecInfoFn = stmtSetExecInfo, + .getExecInfoFn = stmtGetExecInfo, + }; - STMT_ERR_RET(parseSql(pStmt->pRequest, false, &pStmt->pQuery, &stmtCb)); + STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb)); - pStmt->tbNeedParse = false; + pStmt->bind.needParse = false; - switch (nodeType(pStmt->pQuery->pRoot)) { + switch (nodeType(pStmt->sql.pQuery->pRoot)) { case QUERY_NODE_VNODE_MODIF_STMT: - if (0 == pStmt->type) { - pStmt->type = STMT_TYPE_INSERT; + if (0 == pStmt->sql.type) { + pStmt->sql.type = STMT_TYPE_INSERT; } break; case QUERY_NODE_SELECT_STMT: - pStmt->type = STMT_TYPE_QUERY; + pStmt->sql.type = STMT_TYPE_QUERY; break; default: - tscError("not supported stmt type %d", nodeType(pStmt->pQuery->pRoot)); + tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot)); STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR); } return TSDB_CODE_SUCCESS; } -int32_t stmtCloneBlock(STableDataBlocks** pDst, STableDataBlocks* pSrc) { +void stmtResetDataBlock(STableDataBlocks* pBlock) { + pBlock->pData = NULL; + pBlock->ordered = true; + pBlock->prevTS = INT64_MIN; + pBlock->size = sizeof(SSubmitBlk); + pBlock->tsSource = -1; + pBlock->numOfTables = 1; + pBlock->nAllocSize = 0; + pBlock->headerSize = pBlock->size; + + memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder)); +} + + +int32_t stmtCloneDataBlock(STableDataBlocks** pDst, STableDataBlocks* pSrc) { *pDst = (STableDataBlocks*)taosMemoryMalloc(sizeof(STableDataBlocks)); if (NULL == *pDst) { STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -52,49 +101,96 @@ int32_t stmtCloneBlock(STableDataBlocks** pDst, STableDataBlocks* pSrc) { memcpy(*pDst, pSrc, sizeof(STableDataBlocks)); (*pDst)->cloned = true; - (*pDst)->pData = NULL; - (*pDst)->ordered = true; - (*pDst)->prevTS = INT64_MIN; - (*pDst)->size = sizeof(SSubmitBlk); - (*pDst)->tsSource = -1; + stmtResetDataBlock(*pDst); return TSDB_CODE_SUCCESS; } -int32_t stmtSaveTableDataBlock(STscStmt *pStmt) { - if (pStmt->type != STMT_TYPE_MULTI_INSERT) { - return TSDB_CODE_SUCCESS; +void stmtFreeDataBlock(STableDataBlocks* pDataBlock) { + if (pDataBlock == NULL) { + return; } - SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->pQuery->pRoot; - SStmtDataCtx *pCtx = &modifyNode->stmtCtx; + taosMemoryFreeClear(pDataBlock->pData); + taosMemoryFreeClear(pDataBlock); +} + + +int32_t stmtCacheBlock(STscStmt *pStmt) { + if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) { + return TSDB_CODE_SUCCESS; + } uint64_t uid; - if (TSDB_CHILD_TABLE == pCtx->tbType) { - uid = pCtx->tbSuid; + if (TSDB_CHILD_TABLE == pStmt->bind.tbType) { + uid = pStmt->bind.tbSuid; } else { - ASSERT(TSDB_NORMAL_TABLE == pCtx->tbType); - uid = pCtx->tbUid; + ASSERT(TSDB_NORMAL_TABLE == pStmt->bind.tbType); + uid = pStmt->bind.tbUid; } - if (taosHashGet(pStmt->pTableDataBlocks, &uid, sizeof(uid))) { + if (taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid))) { return TSDB_CODE_SUCCESS; } - ASSERT(1 == taosHashGetSize(pStmt->pTableDataBlocks)); - - STableDataBlocks** pSrc = taosHashIterate(pStmt->pTableDataBlocks, NULL); + STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid)); STableDataBlocks* pDst = NULL; - STMT_ERR_RET(stmtCloneBlock(&pDst, *pSrc)); + STMT_ERR_RET(stmtCloneDataBlock(&pDst, *pSrc)); + + SStmtTableCache cache = { + .pDataBlock = pDst, + .boundTags = pStmt->bind.boundTags, + }; + + if (taosHashPut(pStmt->sql.pTableCache, &uid, sizeof(uid), &cache, sizeof(cache))) { + return TSDB_CODE_OUT_OF_MEMORY; + } - taosHashPut(pStmt->pTableDataBlocks, &uid, sizeof(uid), &pDst, POINTER_BYTES); + pStmt->bind.boundTags = NULL; return TSDB_CODE_SUCCESS; } -int32_t stmtHandleTbInCache(STscStmt* pStmt) { - if (NULL == pStmt->pTableDataBlocks || taosHashGetSize(pStmt->pTableDataBlocks) <= 0) { +int32_t stmtCleanExecCtx(STscStmt* pStmt, bool keepTable) { + SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->sql.pQuery->pRoot; + + void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); + while (pIter) { + STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter; + + if (keepTable && (*(uint64_t*)taosHashGetKey(pIter, NULL) == pStmt->bind.tbUid)) { + taosMemoryFreeClear(pBlocks->pData); + stmtResetDataBlock(pBlocks); + + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + continue; + } + + stmtFreeDataBlock(pBlocks); + + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + } + + if (keepTable) { + return TSDB_CODE_SUCCESS; + } + + taosHashCleanup(pStmt->exec.pBlockHash); + pStmt->exec.pBlockHash = NULL; + + pStmt->bind.tbUid = 0; + pStmt->bind.tbSuid = 0; + pStmt->bind.tbType = 0; + + destroyBoundColumnInfo(pStmt->bind.boundTags); + taosMemoryFreeClear(pStmt->bind.boundTags); + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtGetFromCache(STscStmt* pStmt) { + if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) { return TSDB_CODE_SUCCESS; } @@ -104,37 +200,46 @@ int32_t stmtHandleTbInCache(STscStmt* pStmt) { STableMeta *pTableMeta = NULL; SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); - STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->sname, &pTableMeta)); - - SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->pQuery->pRoot; - SStmtDataCtx *pCtx = &modifyNode->stmtCtx; + STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bind.sname, &pTableMeta)); - if (pTableMeta->uid == pCtx->tbUid) { - pStmt->tbNeedParse = false; - pStmt->tbReuse = false; + if (pTableMeta->uid == pStmt->bind.tbUid) { + pStmt->bind.needParse = false; + return TSDB_CODE_SUCCESS; } - if (taosHashGet(pCtx->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid))) { - pStmt->tbNeedParse = false; - pStmt->tbReuse = true; - pCtx->tbUid = pTableMeta->uid; - pCtx->tbSuid = pTableMeta->suid; - pCtx->tbType = pTableMeta->tableType; + if (taosHashGet(pStmt->exec.pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid))) { + SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid)); + if (NULL == pCache) { + tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", pTableMeta->uid); + STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); + } + + pStmt->bind.needParse = false; + + pStmt->bind.tbUid = pTableMeta->uid; + pStmt->bind.tbSuid = pTableMeta->suid; + pStmt->bind.tbType = pTableMeta->tableType; + pStmt->bind.boundTags = pCache->boundTags; return TSDB_CODE_SUCCESS; } - STableDataBlocks** pDataBlock = taosHashGet(pStmt->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid)) - if (pDataBlock && *pDataBlock) { - pStmt->tbNeedParse = false; - pStmt->tbReuse = true; + SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid)); + if (pCache) { + pStmt->bind.needParse = false; + + pStmt->bind.tbUid = pTableMeta->uid; + pStmt->bind.tbSuid = pTableMeta->suid; + pStmt->bind.tbType = pTableMeta->tableType; + pStmt->bind.boundTags = pCache->boundTags; - pCtx->tbUid = pTableMeta->uid; - pCtx->tbSuid = pTableMeta->suid; - pCtx->tbType = pTableMeta->tableType; + STableDataBlocks* pNewBlock = NULL; + STMT_ERR_RET(stmtCloneDataBlock(&pNewBlock, pCache->pDataBlock)); - taosHashPut(pCtx->pTableBlockHashObj, &pCtx->tbUid, sizeof(pCtx->tbUid), pDataBlock, POINTER_BYTES); + if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid), &pNewBlock, POINTER_BYTES)) { + STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } return TSDB_CODE_SUCCESS; } @@ -153,17 +258,16 @@ TAOS_STMT *stmtInit(TAOS *taos) { return NULL; } - pStmt->pTableDataBlocks = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - pStmt->pVgList = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - if (NULL == pStmt->pTableDataBlocks || NULL == pStmt->pVgList) { + pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (NULL == pStmt->sql.pTableCache) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; taosMemoryFree(pStmt); return NULL; } pStmt->taos = pObj; - pStmt->status = STMT_INIT; - pStmt->tbNeedParse = true; + pStmt->sql.status = STMT_INIT; + pStmt->bind.needParse = true; return pStmt; } @@ -171,10 +275,10 @@ TAOS_STMT *stmtInit(TAOS *taos) { int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_CHK_STATUS(stmt, STMT_PREPARE, TSDB_CODE_TSC_STMT_STATUS_ERROR); + STMT_SWITCH_STATUS(stmt, STMT_PREPARE, TSDB_CODE_TSC_STMT_API_ERROR); - pStmt->sql = strndup(sql, length); - pStmt->sqlLen = length; + pStmt->sql.sqlStr = strndup(sql, length); + pStmt->sql.sqlLen = length; return TSDB_CODE_SUCCESS; } @@ -183,19 +287,19 @@ int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { int stmtSetTbName(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_CHK_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_STATUS_ERROR); + STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR); - taosMemoryFree(pStmt->tbName); + taosMemoryFree(pStmt->bind.tbName); - if (NULL == pStmt->pRequest) { - STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql, pStmt->sqlLen, &pStmt->pRequest)); + if (NULL == pStmt->exec.pRequest) { + STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest)); } - STMT_ERR_RET(qCreateSName(&pStmt->sname, tbName, pStmt->taos->acctId, pStmt->pRequest->pDb, pStmt->pRequest->msgBuf, pStmt->pRequest->msgBufLen)); + STMT_ERR_RET(qCreateSName(&pStmt->bind.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); - pStmt->tbName = strdup(tbName); + pStmt->bind.tbName = strdup(tbName); - STMT_ERR_RET(stmtHandleTbInCache(pStmt)); + STMT_ERR_RET(stmtGetFromCache(pStmt)); return TSDB_CODE_SUCCESS; } @@ -203,11 +307,11 @@ int stmtSetTbName(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { int stmtSetTbTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_CHK_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_STATUS_ERROR); + STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR); - if (pStmt->tbNeedParse) { - taosMemoryFree(pStmt->bindTags); - pStmt->bindTags = tags; + if (pStmt->bind.needParse) { + taosMemoryFree(pStmt->bind.bindTags); + pStmt->bind.bindTags = tags; STMT_ERR_RET(stmtParseSql(pStmt)); } else { @@ -221,13 +325,24 @@ int stmtSetTbTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_CHK_STATUS(stmt, STMT_FETCH_TAG_FIELDS, TSDB_CODE_TSC_STMT_STATUS_ERROR); + STMT_SWITCH_STATUS(stmt, STMT_FETCH_TAG_FIELDS, TSDB_CODE_TSC_STMT_API_ERROR); - if (pStmt->tbNeedParse) { + if (pStmt->bind.needParse) { STMT_ERR_RET(stmtParseSql(pStmt)); } - STMT_ERR_RET(qBuildStmtTagFields(pStmt->pQuery, fieldNum, fields)); + if (STMT_TYPE_QUERY == pStmt->sql.type) { + tscError("invalid operation to get query tag fileds"); + STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); + } + + STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid)); + if (NULL == pDataBlock) { + tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bind.tbUid); + STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + STMT_ERR_RET(qBuildStmtTagFields(pDataBlock, pStmt->bind.boundTags, fieldNum, fields)); return TSDB_CODE_SUCCESS; } @@ -235,13 +350,24 @@ int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fiel int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD* fields) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_CHK_STATUS(stmt, STMT_FETCH_COL_FIELDS, TSDB_CODE_TSC_STMT_STATUS_ERROR); + STMT_SWITCH_STATUS(stmt, STMT_FETCH_COL_FIELDS, TSDB_CODE_TSC_STMT_API_ERROR); - if (pStmt->tbNeedParse) { + if (pStmt->bind.needParse) { STMT_ERR_RET(stmtParseSql(pStmt)); } - STMT_ERR_RET(qBuildStmtColFields(pStmt->pQuery, fieldNum, fields)); + if (STMT_TYPE_QUERY == pStmt->sql.type) { + tscError("invalid operation to get query column fileds"); + STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); + } + + STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid)); + if (NULL == pDataBlock) { + tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bind.tbUid); + STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + STMT_ERR_RET(qBuildStmtColFields(pDataBlock, fieldNum, fields)); return TSDB_CODE_SUCCESS; } @@ -249,21 +375,27 @@ int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD* field int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_CHK_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_STATUS_ERROR); + STMT_SWITCH_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_API_ERROR); - if (pStmt->tbNeedParse && pStmt->runTimes && pStmt->type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->type) { - pStmt->tbNeedParse = false; + if (pStmt->bind.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { + pStmt->bind.needParse = false; } - if (NULL == pStmt->pRequest) { - STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql, pStmt->sqlLen, &pStmt->pRequest)); + if (NULL == pStmt->exec.pRequest) { + STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest)); } - if (pStmt->tbNeedParse) { + if (pStmt->bind.needParse) { STMT_ERR_RET(stmtParseSql(pStmt)); } + + STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid)); + if (NULL == pDataBlock) { + tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bind.tbUid); + STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } - qBindStmtData(pStmt->pQuery, bind, pStmt->pRequest->msgBuf, pStmt->pRequest->msgBufLen); + qBindStmtData(pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen); return TSDB_CODE_SUCCESS; } @@ -272,9 +404,9 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { int stmtAddBatch(TAOS_STMT *stmt) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_CHK_STATUS(stmt, STMT_ADD_BATCH, TSDB_CODE_TSC_STMT_STATUS_ERROR); + STMT_SWITCH_STATUS(stmt, STMT_ADD_BATCH, TSDB_CODE_TSC_STMT_API_ERROR); - STMT_ERR_RET(stmtSaveTableDataBlock(pStmt)); + STMT_ERR_RET(stmtCacheBlock(pStmt)); return TSDB_CODE_SUCCESS; } @@ -283,23 +415,24 @@ int stmtExec(TAOS_STMT *stmt) { STscStmt* pStmt = (STscStmt*)stmt; int32_t code = 0; - STMT_CHK_STATUS(stmt, STMT_EXECUTE, TSDB_CODE_TSC_STMT_STATUS_ERROR); + STMT_SWITCH_STATUS(stmt, STMT_EXECUTE, TSDB_CODE_TSC_STMT_API_ERROR); - STMT_ERR_RET(qBuildStmtOutput(pStmt->pQuery)); + STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash)); - launchQueryImpl(pStmt->pRequest, pStmt->pQuery, TSDB_CODE_SUCCESS); + launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true); - STMT_ERR_JRET(pStmt->pRequest->code); + STMT_ERR_JRET(pStmt->exec.pRequest->code); _return: - //TODO RESET AND CLEAN PART TO DATABLOCK... + stmtCleanExecCtx(pStmt, (code ? false : true)); - taos_free_result(pStmt->pRequest); - pStmt->pRequest = NULL; + taos_free_result(pStmt->exec.pRequest); + pStmt->exec.pRequest = NULL; - pStmt->tbNeedParse = true; - ++pStmt->runTimes; + pStmt->bind.needParse = true; + + ++pStmt->sql.runTimes; STMT_RET(code); } @@ -316,7 +449,7 @@ char *stmtErrstr(TAOS_STMT *stmt) { return (char*) tstrerror(terrno); } - return taos_errstr(pStmt->pRequest); + return taos_errstr(pStmt->exec.pRequest); } int stmtAffectedRows(TAOS_STMT *stmt) { diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index d069208b23d0a0d4241d654e431c77495448340e..6b7109fea3322b00285b31c56f1776f731d705a8 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1100,17 +1100,17 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { } if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { - pCxt->pOutput->stmtCtx.tbUid = pCxt->pTableMeta->uid; - pCxt->pOutput->stmtCtx.tbSuid = pCxt->pTableMeta->suid; - pCxt->pOutput->stmtCtx.tbType = pCxt->pTableMeta->tableType; - - pCxt->pOutput->stmtCtx.pVgroupsHashObj = pCxt->pVgroupsHashObj; - pCxt->pOutput->stmtCtx.pTableBlockHashObj = pCxt->pTableBlockHashObj; - pCxt->pOutput->stmtCtx.tags = pCxt->tags; + SParsedDataColInfo *tags = taosMemoryMalloc(sizeof(pCxt->tags)); + if (NULL == tags) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); + (*pCxt->pStmtCb->setBindInfoFn)(pCxt->pTableMeta, tags); + memset(&pCxt->tags, 0, sizeof(pCxt->tags)); + (*pCxt->pStmtCb->setExecInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); pCxt->pVgroupsHashObj = NULL; pCxt->pTableBlockHashObj = NULL; - memset(&pCxt->tags, 0, sizeof(pCxt->tags)); return TSDB_CODE_SUCCESS; } @@ -1134,14 +1134,19 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { .pSql = (char*) pContext->pSql, .msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, .pTableMeta = NULL, - .pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false), - .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false), .totalNum = 0, .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT), .pStmtCb = pContext->pStmtCb }; + if (pContext->pStmtCb && *pQuery) { + (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj, &context.pTableBlockHashObj); + } else { + context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false), + context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), + } + if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj || NULL == context.pOutput) { return TSDB_CODE_TSC_OUT_OF_MEMORY; diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index 7d9adbd16ca680ee86c2c1054c80068a760bc877..c601acf7bdba2fc965764ffbeaf8aad2c6b30b59 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -103,6 +103,10 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs) { } void destroyBoundColumnInfo(SParsedDataColInfo* pColList) { + if (NULL == pColList) { + return; + } + taosMemoryFreeClear(pColList->boundColumns); taosMemoryFreeClear(pColList->cols); taosMemoryFreeClear(pColList->colIdxInfo); diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index edd797f35b46b0e1b8bdae07a46fbd5dc180491c..68e9961b6d67777ab12da11bc93dd00fb9e1ccba 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -78,14 +78,7 @@ int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbNam } -int32_t qBindStmtData(SQuery* pQuery, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { - SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; - SStmtDataCtx *pCtx = &modifyNode->stmtCtx; - STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid)); - if (NULL == pDataBlock) { - return TSDB_CODE_QRY_APP_ERROR; - } - +int32_t qBindStmtData(STableDataBlocks *pDataBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); int32_t extendedRowSize = getExtendedRowSize(pDataBlock); SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; @@ -144,27 +137,23 @@ int32_t qBindStmtData(SQuery* pQuery, TAOS_MULTI_BIND *bind, char *msgBuf, int32 return TSDB_CODE_SUCCESS; } -int32_t qBuildStmtOutput(SQuery* pQuery) { +int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; - SStmtDataCtx *pCtx = &modifyNode->stmtCtx; int32_t code = 0; SInsertParseContext insertCtx = { - .pVgroupsHashObj = pCtx->pVgroupsHashObj, - .pTableBlockHashObj = pCtx->pTableBlockHashObj, + .pVgroupsHashObj = pVgHash, + .pTableBlockHashObj = pBlockHash, + .pOutput = pQuery->pRoot }; // merge according to vgId - if (taosHashGetSize(pCtx->pTableBlockHashObj) > 0) { - CHECK_CODE_GOTO(mergeTableDataBlocks(pCtx->pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks), _return); + if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) { + CHECK_CODE_GOTO(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks), _return); } - - code = buildOutput(&insertCtx); - -_return: - destroyInsertParseContext(&insertCtx); + CHECK_CODE(buildOutput(&insertCtx)); - return code; + return TSDB_CODE_SUCCESS; } int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_t *fieldNum, TAOS_FIELD** fields) { @@ -186,37 +175,28 @@ int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_ } -int32_t qBuildStmtTagFields(SQuery* pQuery, int32_t *fieldNum, TAOS_FIELD** fields) { - SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; - SStmtDataCtx *pCtx = &modifyNode->stmtCtx; - STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid)); - if (NULL == pDataBlock) { +int32_t qBuildStmtTagFields(STableDataBlocks *pDataBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields) { + SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; + if (NULL == tags) { return TSDB_CODE_QRY_APP_ERROR; } SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); - if (pCtx->tags.numOfBound <= 0) { + if (tags->numOfBound <= 0) { *fieldNum = 0; *fields = NULL; return TSDB_CODE_SUCCESS; } - CHECK_CODE(buildBoundFields(&pCtx->tags, pSchema, fieldNum, fields)); + CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields)); return TSDB_CODE_SUCCESS; } -int32_t qBuildStmtColFields(SQuery* pQuery, int32_t *fieldNum, TAOS_FIELD** fields) { - SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; - SStmtDataCtx *pCtx = &modifyNode->stmtCtx; - STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid)); - if (NULL == pDataBlock) { - return TSDB_CODE_QRY_APP_ERROR; - } - +int32_t qBuildStmtColFields(STableDataBlocks *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields) { SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); - if (pCtx->tags.numOfBound <= 0) { + if (pDataBlock->boundColumnInfo.numOfBound <= 0) { *fieldNum = 0; *fields = NULL; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 139b0d30f58c0fdde3325d3be0ea8a4612ffb2b9..9332cb481ecac5ee6febb31da97f4986a17e74f3 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -138,7 +138,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input") -TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_STATUS_ERROR, "Stmt API usage error") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_API_ERROR, "Stmt API usage error") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause")