提交 9547c0a0 编写于 作者: D dapan1121

stmt

上级 e3ea1730
......@@ -280,16 +280,6 @@ typedef struct SVgDataBlocks {
char *pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SStmtDataCtx {
uint64_t tbUid;
uint64_t tbSuid;
int8_t tbType;
SParsedDataColInfo tags;
SHashObj* pVgroupsHashObj;
SHashObj* pTableBlockHashObj;
} SStmtDataCtx;
typedef struct SVnodeModifOpStmt {
ENodeType nodeType;
ENodeType sqlNodeType;
......@@ -297,7 +287,6 @@ typedef struct SVnodeModifOpStmt {
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
SStmtDataCtx stmtCtx;
} SVnodeModifOpStmt;
typedef struct SExplainOptions {
......
......@@ -24,7 +24,10 @@ extern "C" {
typedef struct SStmtCallback {
TAOS_STMT* pStmt;
int32_t (*getTbNameFn)(TAOS_STMT*, char**);
int32_t (*getTbNameFn)(TAOS_STMT*, char**);
int32_t (*setBindInfoFn)(TAOS_STMT*, STableMeta*, void*);
int32_t (*setExecInfoFn)(TAOS_STMT*, SHashObj*, SHashObj*);
int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**);
} SStmtCallback;
typedef struct SParseContext {
......
......@@ -132,7 +132,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222)
#define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223)
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224)
#define TSDB_CODE_TSC_STMT_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225)
#define TSDB_CODE_TSC_STMT_API_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225)
#define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226)
#define TSDB_CODE_TSC_STMT_CLAUSE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0227)
......
......@@ -38,27 +38,46 @@ typedef enum {
STMT_EXECUTE
} STMT_STATUS;
typedef struct SStmtTableCache {
STableDataBlocks* pDataBlock;
void* boundTags;
} SStmtTableCache;
typedef struct SStmtBindInfo {
bool needParse;
uint64_t tbUid;
uint64_t tbSuid;
int8_t tbType;
void* boundTags;
char* tbName;
SName sname;
TAOS_BIND* bindTags;
} SStmtBindInfo;
typedef struct SStmtExecInfo {
SRequestObj* pRequest;
SHashObj* pVgHash;
SHashObj* pBlockHash;
} SStmtExecInfo;
typedef struct STscStmt {
typedef struct SStmtSQLInfo {
STMT_TYPE type;
STMT_STATUS status;
bool autoCreate;
uint64_t runTimes;
STscObj* taos;
SCatalog* pCatalog;
SHashObj* pTableDataBlocks;
SHashObj* pVgList;
bool tbNeedParse;
bool tbReuse;
SRequestObj* pRequest;
SHashObj* pTableCache; //SHash<SStmtTableCache>
SQuery* pQuery;
char* sql;
char* sqlStr;
int32_t sqlLen;
char* tbName;
SName sname;
TAOS_BIND* bindTags;
} SStmtSQLInfo;
typedef struct STscStmt {
STscObj* taos;
SCatalog* pCatalog;
SStmtSQLInfo sql;
SStmtExecInfo exec;
SStmtBindInfo bind;
//SMultiTbStmt mtb;
//SNormalStmt normal;
......@@ -71,16 +90,19 @@ typedef struct STscStmt {
#define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define STMT_CHK_STATUS(_stmt, _status, _v) do {
switch (_status) {
#define STMT_SWITCH_STATUS(_stmt, _newstatus, _errcode) do {
switch (_newstatus) {
case STMT_INIT:
if ((_stmt)->status != 0) return (_v);
if ((_stmt)->status != 0) return (_errcode);
break;
case STMT_PREPARE:
if ((_stmt)->status != STMT_INIT) STMT_ERR_RET(_v);
if ((_stmt)->status != STMT_INIT) STMT_ERR_RET(_errcode);
break;
case STMT_SETTBNAME:
break;
default:
STMT_ERR_RET(_errcode);
break;
}
} while (0)
......
......@@ -283,7 +283,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code;
}
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code) {
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) {
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
if (TSDB_CODE_SUCCESS == code) {
......@@ -309,7 +309,10 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
}
taosArrayDestroy(pNodeList);
qDestroyQuery(pQuery);
if (!keepQuery) {
qDestroyQuery(pQuery);
}
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno;
}
......@@ -327,7 +330,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
code = parseSql(pRequest, false, &pQuery, NULL);
}
return launchQueryImpl(pRequest, pQuery, code);
return launchQueryImpl(pRequest, pQuery, code, false);
}
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
......
......@@ -7,43 +7,92 @@
int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->type = STMT_TYPE_MULTI_INSERT;
pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
if (NULL == pStmt->tbName) {
if (NULL == pStmt->bind.tbName) {
tscError("no table name set");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
}
*tbName = pStmt->tbName;
*tbName = pStmt->bind.tbName;
return TSDB_CODE_SUCCESS;
}
int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->bind.tbUid = pTableMeta->uid;
pStmt->bind.tbSuid = pTableMeta->suid;
pStmt->bind.tbType = pTableMeta->tableType;
pStmt->bind.boundTags = tags;
return TSDB_CODE_SUCCESS;
}
int32_t stmtSetExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->exec.pVgHash = pVgHash;
pStmt->exec.pBlockHash = pBlockHash;
return TSDB_CODE_SUCCESS;
}
int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
*pVgHash = pStmt->exec.pVgHash;
*pBlockHash = pStmt->exec.pBlockHash;
return TSDB_CODE_SUCCESS;
}
int32_t stmtParseSql(STscStmt* pStmt) {
SStmtCallback stmtCb = {.pStmt = pStmt, .getTbNameFn = stmtGetTbName};
SStmtCallback stmtCb = {
.pStmt = pStmt,
.getTbNameFn = stmtGetTbName,
.setBindInfoFn = stmtSetBindInfo,
.setExecInfoFn = stmtSetExecInfo,
.getExecInfoFn = stmtGetExecInfo,
};
STMT_ERR_RET(parseSql(pStmt->pRequest, false, &pStmt->pQuery, &stmtCb));
STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
pStmt->tbNeedParse = false;
pStmt->bind.needParse = false;
switch (nodeType(pStmt->pQuery->pRoot)) {
switch (nodeType(pStmt->sql.pQuery->pRoot)) {
case QUERY_NODE_VNODE_MODIF_STMT:
if (0 == pStmt->type) {
pStmt->type = STMT_TYPE_INSERT;
if (0 == pStmt->sql.type) {
pStmt->sql.type = STMT_TYPE_INSERT;
}
break;
case QUERY_NODE_SELECT_STMT:
pStmt->type = STMT_TYPE_QUERY;
pStmt->sql.type = STMT_TYPE_QUERY;
break;
default:
tscError("not supported stmt type %d", nodeType(pStmt->pQuery->pRoot));
tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot));
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
}
return TSDB_CODE_SUCCESS;
}
int32_t stmtCloneBlock(STableDataBlocks** pDst, STableDataBlocks* pSrc) {
void stmtResetDataBlock(STableDataBlocks* pBlock) {
pBlock->pData = NULL;
pBlock->ordered = true;
pBlock->prevTS = INT64_MIN;
pBlock->size = sizeof(SSubmitBlk);
pBlock->tsSource = -1;
pBlock->numOfTables = 1;
pBlock->nAllocSize = 0;
pBlock->headerSize = pBlock->size;
memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
}
int32_t stmtCloneDataBlock(STableDataBlocks** pDst, STableDataBlocks* pSrc) {
*pDst = (STableDataBlocks*)taosMemoryMalloc(sizeof(STableDataBlocks));
if (NULL == *pDst) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
......@@ -52,49 +101,96 @@ int32_t stmtCloneBlock(STableDataBlocks** pDst, STableDataBlocks* pSrc) {
memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
(*pDst)->cloned = true;
(*pDst)->pData = NULL;
(*pDst)->ordered = true;
(*pDst)->prevTS = INT64_MIN;
(*pDst)->size = sizeof(SSubmitBlk);
(*pDst)->tsSource = -1;
stmtResetDataBlock(*pDst);
return TSDB_CODE_SUCCESS;
}
int32_t stmtSaveTableDataBlock(STscStmt *pStmt) {
if (pStmt->type != STMT_TYPE_MULTI_INSERT) {
return TSDB_CODE_SUCCESS;
void stmtFreeDataBlock(STableDataBlocks* pDataBlock) {
if (pDataBlock == NULL) {
return;
}
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
taosMemoryFreeClear(pDataBlock->pData);
taosMemoryFreeClear(pDataBlock);
}
int32_t stmtCacheBlock(STscStmt *pStmt) {
if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
return TSDB_CODE_SUCCESS;
}
uint64_t uid;
if (TSDB_CHILD_TABLE == pCtx->tbType) {
uid = pCtx->tbSuid;
if (TSDB_CHILD_TABLE == pStmt->bind.tbType) {
uid = pStmt->bind.tbSuid;
} else {
ASSERT(TSDB_NORMAL_TABLE == pCtx->tbType);
uid = pCtx->tbUid;
ASSERT(TSDB_NORMAL_TABLE == pStmt->bind.tbType);
uid = pStmt->bind.tbUid;
}
if (taosHashGet(pStmt->pTableDataBlocks, &uid, sizeof(uid))) {
if (taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid))) {
return TSDB_CODE_SUCCESS;
}
ASSERT(1 == taosHashGetSize(pStmt->pTableDataBlocks));
STableDataBlocks** pSrc = taosHashIterate(pStmt->pTableDataBlocks, NULL);
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid));
STableDataBlocks* pDst = NULL;
STMT_ERR_RET(stmtCloneBlock(&pDst, *pSrc));
STMT_ERR_RET(stmtCloneDataBlock(&pDst, *pSrc));
SStmtTableCache cache = {
.pDataBlock = pDst,
.boundTags = pStmt->bind.boundTags,
};
if (taosHashPut(pStmt->sql.pTableCache, &uid, sizeof(uid), &cache, sizeof(cache))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashPut(pStmt->pTableDataBlocks, &uid, sizeof(uid), &pDst, POINTER_BYTES);
pStmt->bind.boundTags = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t stmtHandleTbInCache(STscStmt* pStmt) {
if (NULL == pStmt->pTableDataBlocks || taosHashGetSize(pStmt->pTableDataBlocks) <= 0) {
int32_t stmtCleanExecCtx(STscStmt* pStmt, bool keepTable) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->sql.pQuery->pRoot;
void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
while (pIter) {
STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;
if (keepTable && (*(uint64_t*)taosHashGetKey(pIter, NULL) == pStmt->bind.tbUid)) {
taosMemoryFreeClear(pBlocks->pData);
stmtResetDataBlock(pBlocks);
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
continue;
}
stmtFreeDataBlock(pBlocks);
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
}
if (keepTable) {
return TSDB_CODE_SUCCESS;
}
taosHashCleanup(pStmt->exec.pBlockHash);
pStmt->exec.pBlockHash = NULL;
pStmt->bind.tbUid = 0;
pStmt->bind.tbSuid = 0;
pStmt->bind.tbType = 0;
destroyBoundColumnInfo(pStmt->bind.boundTags);
taosMemoryFreeClear(pStmt->bind.boundTags);
return TSDB_CODE_SUCCESS;
}
int32_t stmtGetFromCache(STscStmt* pStmt) {
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
return TSDB_CODE_SUCCESS;
}
......@@ -104,37 +200,46 @@ int32_t stmtHandleTbInCache(STscStmt* pStmt) {
STableMeta *pTableMeta = NULL;
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->sname, &pTableMeta));
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bind.sname, &pTableMeta));
if (pTableMeta->uid == pCtx->tbUid) {
pStmt->tbNeedParse = false;
pStmt->tbReuse = false;
if (pTableMeta->uid == pStmt->bind.tbUid) {
pStmt->bind.needParse = false;
return TSDB_CODE_SUCCESS;
}
if (taosHashGet(pCtx->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid))) {
pStmt->tbNeedParse = false;
pStmt->tbReuse = true;
pCtx->tbUid = pTableMeta->uid;
pCtx->tbSuid = pTableMeta->suid;
pCtx->tbType = pTableMeta->tableType;
if (taosHashGet(pStmt->exec.pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid))) {
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
if (NULL == pCache) {
tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", pTableMeta->uid);
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
pStmt->bind.needParse = false;
pStmt->bind.tbUid = pTableMeta->uid;
pStmt->bind.tbSuid = pTableMeta->suid;
pStmt->bind.tbType = pTableMeta->tableType;
pStmt->bind.boundTags = pCache->boundTags;
return TSDB_CODE_SUCCESS;
}
STableDataBlocks** pDataBlock = taosHashGet(pStmt->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid))
if (pDataBlock && *pDataBlock) {
pStmt->tbNeedParse = false;
pStmt->tbReuse = true;
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
if (pCache) {
pStmt->bind.needParse = false;
pStmt->bind.tbUid = pTableMeta->uid;
pStmt->bind.tbSuid = pTableMeta->suid;
pStmt->bind.tbType = pTableMeta->tableType;
pStmt->bind.boundTags = pCache->boundTags;
pCtx->tbUid = pTableMeta->uid;
pCtx->tbSuid = pTableMeta->suid;
pCtx->tbType = pTableMeta->tableType;
STableDataBlocks* pNewBlock = NULL;
STMT_ERR_RET(stmtCloneDataBlock(&pNewBlock, pCache->pDataBlock));
taosHashPut(pCtx->pTableBlockHashObj, &pCtx->tbUid, sizeof(pCtx->tbUid), pDataBlock, POINTER_BYTES);
if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid), &pNewBlock, POINTER_BYTES)) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
......@@ -153,17 +258,16 @@ TAOS_STMT *stmtInit(TAOS *taos) {
return NULL;
}
pStmt->pTableDataBlocks = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pStmt->pVgList = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pStmt->pTableDataBlocks || NULL == pStmt->pVgList) {
pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == pStmt->sql.pTableCache) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
taosMemoryFree(pStmt);
return NULL;
}
pStmt->taos = pObj;
pStmt->status = STMT_INIT;
pStmt->tbNeedParse = true;
pStmt->sql.status = STMT_INIT;
pStmt->bind.needParse = true;
return pStmt;
}
......@@ -171,10 +275,10 @@ TAOS_STMT *stmtInit(TAOS *taos) {
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_PREPARE, TSDB_CODE_TSC_STMT_STATUS_ERROR);
STMT_SWITCH_STATUS(stmt, STMT_PREPARE, TSDB_CODE_TSC_STMT_API_ERROR);
pStmt->sql = strndup(sql, length);
pStmt->sqlLen = length;
pStmt->sql.sqlStr = strndup(sql, length);
pStmt->sql.sqlLen = length;
return TSDB_CODE_SUCCESS;
}
......@@ -183,19 +287,19 @@ int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_STATUS_ERROR);
STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR);
taosMemoryFree(pStmt->tbName);
taosMemoryFree(pStmt->bind.tbName);
if (NULL == pStmt->pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql, pStmt->sqlLen, &pStmt->pRequest));
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(qCreateSName(&pStmt->sname, tbName, pStmt->taos->acctId, pStmt->pRequest->pDb, pStmt->pRequest->msgBuf, pStmt->pRequest->msgBufLen));
STMT_ERR_RET(qCreateSName(&pStmt->bind.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
pStmt->tbName = strdup(tbName);
pStmt->bind.tbName = strdup(tbName);
STMT_ERR_RET(stmtHandleTbInCache(pStmt));
STMT_ERR_RET(stmtGetFromCache(pStmt));
return TSDB_CODE_SUCCESS;
}
......@@ -203,11 +307,11 @@ int stmtSetTbName(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) {
int stmtSetTbTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_STATUS_ERROR);
STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR);
if (pStmt->tbNeedParse) {
taosMemoryFree(pStmt->bindTags);
pStmt->bindTags = tags;
if (pStmt->bind.needParse) {
taosMemoryFree(pStmt->bind.bindTags);
pStmt->bind.bindTags = tags;
STMT_ERR_RET(stmtParseSql(pStmt));
} else {
......@@ -221,13 +325,24 @@ int stmtSetTbTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) {
int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_FETCH_TAG_FIELDS, TSDB_CODE_TSC_STMT_STATUS_ERROR);
STMT_SWITCH_STATUS(stmt, STMT_FETCH_TAG_FIELDS, TSDB_CODE_TSC_STMT_API_ERROR);
if (pStmt->tbNeedParse) {
if (pStmt->bind.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STMT_ERR_RET(qBuildStmtTagFields(pStmt->pQuery, fieldNum, fields));
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query tag fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bind.tbUid);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
STMT_ERR_RET(qBuildStmtTagFields(pDataBlock, pStmt->bind.boundTags, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
......@@ -235,13 +350,24 @@ int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fiel
int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD* fields) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_FETCH_COL_FIELDS, TSDB_CODE_TSC_STMT_STATUS_ERROR);
STMT_SWITCH_STATUS(stmt, STMT_FETCH_COL_FIELDS, TSDB_CODE_TSC_STMT_API_ERROR);
if (pStmt->tbNeedParse) {
if (pStmt->bind.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STMT_ERR_RET(qBuildStmtColFields(pStmt->pQuery, fieldNum, fields));
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query column fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bind.tbUid);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
STMT_ERR_RET(qBuildStmtColFields(pDataBlock, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
......@@ -249,21 +375,27 @@ int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD* field
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_STATUS_ERROR);
STMT_SWITCH_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_API_ERROR);
if (pStmt->tbNeedParse && pStmt->runTimes && pStmt->type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->type) {
pStmt->tbNeedParse = false;
if (pStmt->bind.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bind.needParse = false;
}
if (NULL == pStmt->pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql, pStmt->sqlLen, &pStmt->pRequest));
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->tbNeedParse) {
if (pStmt->bind.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bind.tbUid);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
qBindStmtData(pStmt->pQuery, bind, pStmt->pRequest->msgBuf, pStmt->pRequest->msgBufLen);
qBindStmtData(pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
return TSDB_CODE_SUCCESS;
}
......@@ -272,9 +404,9 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
int stmtAddBatch(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_ADD_BATCH, TSDB_CODE_TSC_STMT_STATUS_ERROR);
STMT_SWITCH_STATUS(stmt, STMT_ADD_BATCH, TSDB_CODE_TSC_STMT_API_ERROR);
STMT_ERR_RET(stmtSaveTableDataBlock(pStmt));
STMT_ERR_RET(stmtCacheBlock(pStmt));
return TSDB_CODE_SUCCESS;
}
......@@ -283,23 +415,24 @@ int stmtExec(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
int32_t code = 0;
STMT_CHK_STATUS(stmt, STMT_EXECUTE, TSDB_CODE_TSC_STMT_STATUS_ERROR);
STMT_SWITCH_STATUS(stmt, STMT_EXECUTE, TSDB_CODE_TSC_STMT_API_ERROR);
STMT_ERR_RET(qBuildStmtOutput(pStmt->pQuery));
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
launchQueryImpl(pStmt->pRequest, pStmt->pQuery, TSDB_CODE_SUCCESS);
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true);
STMT_ERR_JRET(pStmt->pRequest->code);
STMT_ERR_JRET(pStmt->exec.pRequest->code);
_return:
//TODO RESET AND CLEAN PART TO DATABLOCK...
stmtCleanExecCtx(pStmt, (code ? false : true));
taos_free_result(pStmt->pRequest);
pStmt->pRequest = NULL;
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
pStmt->tbNeedParse = true;
++pStmt->runTimes;
pStmt->bind.needParse = true;
++pStmt->sql.runTimes;
STMT_RET(code);
}
......@@ -316,7 +449,7 @@ char *stmtErrstr(TAOS_STMT *stmt) {
return (char*) tstrerror(terrno);
}
return taos_errstr(pStmt->pRequest);
return taos_errstr(pStmt->exec.pRequest);
}
int stmtAffectedRows(TAOS_STMT *stmt) {
......
......@@ -1100,17 +1100,17 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
}
if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
pCxt->pOutput->stmtCtx.tbUid = pCxt->pTableMeta->uid;
pCxt->pOutput->stmtCtx.tbSuid = pCxt->pTableMeta->suid;
pCxt->pOutput->stmtCtx.tbType = pCxt->pTableMeta->tableType;
pCxt->pOutput->stmtCtx.pVgroupsHashObj = pCxt->pVgroupsHashObj;
pCxt->pOutput->stmtCtx.pTableBlockHashObj = pCxt->pTableBlockHashObj;
pCxt->pOutput->stmtCtx.tags = pCxt->tags;
SParsedDataColInfo *tags = taosMemoryMalloc(sizeof(pCxt->tags));
if (NULL == tags) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
(*pCxt->pStmtCb->setBindInfoFn)(pCxt->pTableMeta, tags);
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
(*pCxt->pStmtCb->setExecInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
pCxt->pVgroupsHashObj = NULL;
pCxt->pTableBlockHashObj = NULL;
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
return TSDB_CODE_SUCCESS;
}
......@@ -1134,14 +1134,19 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
.pSql = (char*) pContext->pSql,
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
.pTableMeta = NULL,
.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
.pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false),
.totalNum = 0,
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
.pStmtCb = pContext->pStmtCb
};
if (pContext->pStmtCb && *pQuery) {
(*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj, &context.pTableBlockHashObj);
} else {
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
}
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj ||
NULL == context.pSubTableHashObj || NULL == context.pOutput) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......
......@@ -103,6 +103,10 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs) {
}
void destroyBoundColumnInfo(SParsedDataColInfo* pColList) {
if (NULL == pColList) {
return;
}
taosMemoryFreeClear(pColList->boundColumns);
taosMemoryFreeClear(pColList->cols);
taosMemoryFreeClear(pColList->colIdxInfo);
......
......@@ -78,14 +78,7 @@ int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbNam
}
int32_t qBindStmtData(SQuery* pQuery, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid));
if (NULL == pDataBlock) {
return TSDB_CODE_QRY_APP_ERROR;
}
int32_t qBindStmtData(STableDataBlocks *pDataBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) {
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
......@@ -144,27 +137,23 @@ int32_t qBindStmtData(SQuery* pQuery, TAOS_MULTI_BIND *bind, char *msgBuf, int32
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtOutput(SQuery* pQuery) {
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
int32_t code = 0;
SInsertParseContext insertCtx = {
.pVgroupsHashObj = pCtx->pVgroupsHashObj,
.pTableBlockHashObj = pCtx->pTableBlockHashObj,
.pVgroupsHashObj = pVgHash,
.pTableBlockHashObj = pBlockHash,
.pOutput = pQuery->pRoot
};
// merge according to vgId
if (taosHashGetSize(pCtx->pTableBlockHashObj) > 0) {
CHECK_CODE_GOTO(mergeTableDataBlocks(pCtx->pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks), _return);
if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
CHECK_CODE_GOTO(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks), _return);
}
code = buildOutput(&insertCtx);
_return:
destroyInsertParseContext(&insertCtx);
CHECK_CODE(buildOutput(&insertCtx));
return code;
return TSDB_CODE_SUCCESS;
}
int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_t *fieldNum, TAOS_FIELD** fields) {
......@@ -186,37 +175,28 @@ int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_
}
int32_t qBuildStmtTagFields(SQuery* pQuery, int32_t *fieldNum, TAOS_FIELD** fields) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid));
if (NULL == pDataBlock) {
int32_t qBuildStmtTagFields(STableDataBlocks *pDataBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields) {
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR;
}
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
if (pCtx->tags.numOfBound <= 0) {
if (tags->numOfBound <= 0) {
*fieldNum = 0;
*fields = NULL;
return TSDB_CODE_SUCCESS;
}
CHECK_CODE(buildBoundFields(&pCtx->tags, pSchema, fieldNum, fields));
CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtColFields(SQuery* pQuery, int32_t *fieldNum, TAOS_FIELD** fields) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid));
if (NULL == pDataBlock) {
return TSDB_CODE_QRY_APP_ERROR;
}
int32_t qBuildStmtColFields(STableDataBlocks *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields) {
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
if (pCtx->tags.numOfBound <= 0) {
if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
*fieldNum = 0;
*fields = NULL;
......
......@@ -138,7 +138,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_STATUS_ERROR, "Stmt API usage error")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_API_ERROR, "Stmt API usage error")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册