提交 e3ea1730 编写于 作者: D dapan1121

stmt

上级 69f09fd0
...@@ -282,10 +282,12 @@ typedef struct SVgDataBlocks { ...@@ -282,10 +282,12 @@ typedef struct SVgDataBlocks {
typedef struct SStmtDataCtx { typedef struct SStmtDataCtx {
uint64_t tbUid; uint64_t tbUid;
SHashObj* pVgroupsHashObj; // global uint64_t tbSuid;
SHashObj* pTableBlockHashObj; // global int8_t tbType;
SHashObj* pSubTableHashObj; // global SParsedDataColInfo tags;
SArray* pTableDataBlocks; // global
SHashObj* pVgroupsHashObj;
SHashObj* pTableBlockHashObj;
} SStmtDataCtx; } SStmtDataCtx;
typedef struct SVnodeModifOpStmt { typedef struct SVnodeModifOpStmt {
......
...@@ -134,6 +134,7 @@ int32_t* taosGetErrno(); ...@@ -134,6 +134,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) #define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224)
#define TSDB_CODE_TSC_STMT_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225) #define TSDB_CODE_TSC_STMT_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225)
#define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226) #define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226)
#define TSDB_CODE_TSC_STMT_CLAUSE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0227)
// mnode-common // mnode-common
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300) #define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
......
...@@ -30,22 +30,36 @@ typedef enum { ...@@ -30,22 +30,36 @@ typedef enum {
STMT_INIT = 1, STMT_INIT = 1,
STMT_PREPARE, STMT_PREPARE,
STMT_SETTBNAME, STMT_SETTBNAME,
STMT_FETCH_TAG_FIELDS,
STMT_FETCH_COL_FIELDS,
STMT_BIND, STMT_BIND,
STMT_BIND_COL, STMT_BIND_COL,
STMT_ADD_BATCH, STMT_ADD_BATCH,
STMT_EXECUTE STMT_EXECUTE
} STMT_STATUS; } STMT_STATUS;
typedef struct STscStmt { typedef struct STscStmt {
STMT_TYPE type; STMT_TYPE type;
STMT_STATUS status; STMT_STATUS status;
bool autoCreate;
uint64_t runTimes;
STscObj* taos; STscObj* taos;
SCatalog* pCatalog;
SHashObj* pTableDataBlocks;
SHashObj* pVgList;
bool tbNeedParse;
bool tbReuse;
SRequestObj* pRequest; SRequestObj* pRequest;
SQuery* pQuery; SQuery* pQuery;
char* sql; char* sql;
int32_t sqlLen; int32_t sqlLen;
char* tbName; char* tbName;
SName sname;
TAOS_BIND* bindTags; TAOS_BIND* bindTags;
//SMultiTbStmt mtb; //SMultiTbStmt mtb;
//SNormalStmt normal; //SNormalStmt normal;
......
...@@ -283,16 +283,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -283,16 +283,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code; return pRequest->code;
} }
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code) {
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (TSDB_CODE_SUCCESS == code) {
code = parseSql(pRequest, false, &pQuery, NULL);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
switch (pQuery->execMode) { switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL: case QUERY_EXEC_MODE_LOCAL:
...@@ -324,6 +317,19 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { ...@@ -324,6 +317,19 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
return pRequest; return pRequest;
} }
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (TSDB_CODE_SUCCESS == code) {
code = parseSql(pRequest, false, &pQuery, NULL);
}
return launchQueryImpl(pRequest, pQuery, code);
}
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
SCatalog* pCatalog = NULL; SCatalog* pCatalog = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -368,7 +374,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { ...@@ -368,7 +374,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
int32_t code = 0; int32_t code = 0;
while (retryNum++ < REQUEST_MAX_TRY_TIMES) { while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
pRequest = execQueryImpl(pTscObj, sql, sqlLen); pRequest = launchQuery(pTscObj, sql, sqlLen);
if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break; break;
} }
......
...@@ -580,13 +580,22 @@ int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { ...@@ -580,13 +580,22 @@ int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
} }
int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) { int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) {
if (stmt == NULL || name == NULL || tags == NULL) { if (stmt == NULL || name == NULL) {
tscError("NULL parameter for %s", __FUNCTION__); tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
return stmtSetTbNameTags(stmt, name, tags); int32_t code = stmtSetTbName(stmt, name);
if (code) {
return code;
}
if (tags) {
return stmtSetTbTags(stmt, tags);
}
return TSDB_CODE_SUCCESS;
} }
int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
...@@ -596,7 +605,7 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { ...@@ -596,7 +605,7 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
return terrno; return terrno;
} }
return stmtSetTbNameTags(stmt, name, NULL); return stmtSetTbName(stmt, name);
} }
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) { int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) {
......
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) { int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
pStmt->type = STMT_TYPE_MULTI_INSERT;
if (NULL == pStmt->tbName) { if (NULL == pStmt->tbName) {
tscError("no table name set"); tscError("no table name set");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR); STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
...@@ -17,18 +19,151 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) { ...@@ -17,18 +19,151 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t stmtParseSql(STscStmt* pStmt) {
SStmtCallback stmtCb = {.pStmt = pStmt, .getTbNameFn = stmtGetTbName};
STMT_ERR_RET(parseSql(pStmt->pRequest, false, &pStmt->pQuery, &stmtCb));
pStmt->tbNeedParse = false;
switch (nodeType(pStmt->pQuery->pRoot)) {
case QUERY_NODE_VNODE_MODIF_STMT:
if (0 == pStmt->type) {
pStmt->type = STMT_TYPE_INSERT;
}
break;
case QUERY_NODE_SELECT_STMT:
pStmt->type = STMT_TYPE_QUERY;
break;
default:
tscError("not supported stmt type %d", nodeType(pStmt->pQuery->pRoot));
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
}
return TSDB_CODE_SUCCESS;
}
int32_t stmtCloneBlock(STableDataBlocks** pDst, STableDataBlocks* pSrc) {
*pDst = (STableDataBlocks*)taosMemoryMalloc(sizeof(STableDataBlocks));
if (NULL == *pDst) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
(*pDst)->cloned = true;
(*pDst)->pData = NULL;
(*pDst)->ordered = true;
(*pDst)->prevTS = INT64_MIN;
(*pDst)->size = sizeof(SSubmitBlk);
(*pDst)->tsSource = -1;
return TSDB_CODE_SUCCESS;
}
int32_t stmtSaveTableDataBlock(STscStmt *pStmt) {
if (pStmt->type != STMT_TYPE_MULTI_INSERT) {
return TSDB_CODE_SUCCESS;
}
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
uint64_t uid;
if (TSDB_CHILD_TABLE == pCtx->tbType) {
uid = pCtx->tbSuid;
} else {
ASSERT(TSDB_NORMAL_TABLE == pCtx->tbType);
uid = pCtx->tbUid;
}
if (taosHashGet(pStmt->pTableDataBlocks, &uid, sizeof(uid))) {
return TSDB_CODE_SUCCESS;
}
ASSERT(1 == taosHashGetSize(pStmt->pTableDataBlocks));
STableDataBlocks** pSrc = taosHashIterate(pStmt->pTableDataBlocks, NULL);
STableDataBlocks* pDst = NULL;
STMT_ERR_RET(stmtCloneBlock(&pDst, *pSrc));
taosHashPut(pStmt->pTableDataBlocks, &uid, sizeof(uid), &pDst, POINTER_BYTES);
return TSDB_CODE_SUCCESS;
}
int32_t stmtHandleTbInCache(STscStmt* pStmt) {
if (NULL == pStmt->pTableDataBlocks || taosHashGetSize(pStmt->pTableDataBlocks) <= 0) {
return TSDB_CODE_SUCCESS;
}
if (NULL == pStmt->pCatalog) {
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
}
STableMeta *pTableMeta = NULL;
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->sname, &pTableMeta));
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
if (pTableMeta->uid == pCtx->tbUid) {
pStmt->tbNeedParse = false;
pStmt->tbReuse = false;
return TSDB_CODE_SUCCESS;
}
if (taosHashGet(pCtx->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid))) {
pStmt->tbNeedParse = false;
pStmt->tbReuse = true;
pCtx->tbUid = pTableMeta->uid;
pCtx->tbSuid = pTableMeta->suid;
pCtx->tbType = pTableMeta->tableType;
return TSDB_CODE_SUCCESS;
}
STableDataBlocks** pDataBlock = taosHashGet(pStmt->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid))
if (pDataBlock && *pDataBlock) {
pStmt->tbNeedParse = false;
pStmt->tbReuse = true;
pCtx->tbUid = pTableMeta->uid;
pCtx->tbSuid = pTableMeta->suid;
pCtx->tbType = pTableMeta->tableType;
taosHashPut(pCtx->pTableBlockHashObj, &pCtx->tbUid, sizeof(pCtx->tbUid), pDataBlock, POINTER_BYTES);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS;
}
TAOS_STMT *stmtInit(TAOS *taos) { TAOS_STMT *stmtInit(TAOS *taos) {
STscObj* pObj = (STscObj*)taos; STscObj* pObj = (STscObj*)taos;
STscStmt* pStmt = NULL; STscStmt* pStmt = NULL;
pStmt = taosMemoryCalloc(1, sizeof(STscStmt)); pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
if (pStmt == NULL) { if (NULL == pStmt) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("failed to allocate memory for statement");
return NULL; return NULL;
} }
pStmt->pTableDataBlocks = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pStmt->pVgList = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pStmt->pTableDataBlocks || NULL == pStmt->pVgList) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
taosMemoryFree(pStmt);
return NULL;
}
pStmt->taos = pObj; pStmt->taos = pObj;
pStmt->status = STMT_INIT; pStmt->status = STMT_INIT;
pStmt->tbNeedParse = true;
return pStmt; return pStmt;
} }
...@@ -45,34 +180,89 @@ int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { ...@@ -45,34 +180,89 @@ int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
} }
int stmtSetTbNameTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { int stmtSetTbName(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_STATUS_ERROR); STMT_CHK_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_STATUS_ERROR);
if (tbName) { taosMemoryFree(pStmt->tbName);
pStmt->tbName = strdup(tbName);
if (NULL == pStmt->pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql, pStmt->sqlLen, &pStmt->pRequest));
} }
STMT_ERR_RET(qCreateSName(&pStmt->sname, tbName, pStmt->taos->acctId, pStmt->pRequest->pDb, pStmt->pRequest->msgBuf, pStmt->pRequest->msgBufLen));
pStmt->tbName = strdup(tbName);
STMT_ERR_RET(stmtHandleTbInCache(pStmt));
return TSDB_CODE_SUCCESS;
}
int stmtSetTbTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) {
STscStmt* pStmt = (STscStmt*)stmt;
if (tags) { STMT_CHK_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_STATUS_ERROR);
if (pStmt->tbNeedParse) {
taosMemoryFree(pStmt->bindTags);
pStmt->bindTags = tags; pStmt->bindTags = tags;
STMT_ERR_RET(stmtParseSql(pStmt));
} else {
//TODO BIND TAG DATA
}
return TSDB_CODE_SUCCESS;
}
int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_FETCH_TAG_FIELDS, TSDB_CODE_TSC_STMT_STATUS_ERROR);
if (pStmt->tbNeedParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
} }
STMT_ERR_RET(qBuildStmtTagFields(pStmt->pQuery, fieldNum, fields));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD* fields) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_FETCH_COL_FIELDS, TSDB_CODE_TSC_STMT_STATUS_ERROR);
if (pStmt->tbNeedParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STMT_ERR_RET(qBuildStmtColFields(pStmt->pQuery, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_STATUS_ERROR); STMT_CHK_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_STATUS_ERROR);
if (NULL == pStmt->pRequest) { if (pStmt->tbNeedParse && pStmt->runTimes && pStmt->type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->type) {
SStmtCallback stmtCb = {.pStmt = stmt, .getTbNameFn = stmtGetTbName}; pStmt->tbNeedParse = false;
}
if (NULL == pStmt->pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql, pStmt->sqlLen, &pStmt->pRequest)); STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql, pStmt->sqlLen, &pStmt->pRequest));
STMT_ERR_RET(parseSql(pStmt->pRequest, false, &pStmt->pQuery, &stmtCb));
} }
if (pStmt->tbNeedParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
qBindStmtData(pStmt->pQuery, bind, pStmt->pRequest->msgBuf, pStmt->pRequest->msgBufLen); qBindStmtData(pStmt->pQuery, bind, pStmt->pRequest->msgBuf, pStmt->pRequest->msgBufLen);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -82,15 +272,36 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { ...@@ -82,15 +272,36 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
int stmtAddBatch(TAOS_STMT *stmt) { int stmtAddBatch(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_STATUS_ERROR); STMT_CHK_STATUS(stmt, STMT_ADD_BATCH, TSDB_CODE_TSC_STMT_STATUS_ERROR);
qBuildStmtOutput(pStmt->pQuery); STMT_ERR_RET(stmtSaveTableDataBlock(pStmt));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int stmtExec(TAOS_STMT *stmt) { int stmtExec(TAOS_STMT *stmt) {
return TSDB_CODE_SUCCESS; STscStmt* pStmt = (STscStmt*)stmt;
int32_t code = 0;
STMT_CHK_STATUS(stmt, STMT_EXECUTE, TSDB_CODE_TSC_STMT_STATUS_ERROR);
STMT_ERR_RET(qBuildStmtOutput(pStmt->pQuery));
launchQueryImpl(pStmt->pRequest, pStmt->pQuery, TSDB_CODE_SUCCESS);
STMT_ERR_JRET(pStmt->pRequest->code);
_return:
//TODO RESET AND CLEAN PART TO DATABLOCK...
taos_free_result(pStmt->pRequest);
pStmt->pRequest = NULL;
pStmt->tbNeedParse = true;
++pStmt->runTimes;
STMT_RET(code);
} }
......
...@@ -27,6 +27,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery); ...@@ -27,6 +27,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery); int32_t translate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, char* dbName, SMsgBuf* pMsgBuf);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -59,7 +59,6 @@ typedef struct SInsertParseContext { ...@@ -59,7 +59,6 @@ typedef struct SInsertParseContext {
SHashObj* pVgroupsHashObj; // global SHashObj* pVgroupsHashObj; // global
SHashObj* pTableBlockHashObj; // global SHashObj* pTableBlockHashObj; // global
SHashObj* pSubTableHashObj; // global SHashObj* pSubTableHashObj; // global
SArray* pTableDataBlocks; // global
SArray* pVgDataBlocks; // global SArray* pVgDataBlocks; // global
int32_t totalNum; int32_t totalNum;
SVnodeModifOpStmt* pOutput; SVnodeModifOpStmt* pOutput;
...@@ -164,7 +163,7 @@ static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullD ...@@ -164,7 +163,7 @@ static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullD
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) { int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, char* dbName, SMsgBuf* pMsgBuf) {
const char* msg1 = "name too long"; const char* msg1 = "name too long";
const char* msg2 = "invalid database name"; const char* msg2 = "invalid database name";
const char* msg3 = "db is not specified"; const char* msg3 = "db is not specified";
...@@ -180,7 +179,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar ...@@ -180,7 +179,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar
strncpy(name, pTableName->z, dbLen); strncpy(name, pTableName->z, dbLen);
dbLen = strdequote(name); dbLen = strdequote(name);
code = tNameSetDbName(pName, pParseCtx->acctId, name, dbLen); code = tNameSetDbName(pName, acctId, name, dbLen);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1); return buildInvalidOperationMsg(pMsgBuf, msg1);
} }
...@@ -205,11 +204,11 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar ...@@ -205,11 +204,11 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar
strncpy(name, pTableName->z, pTableName->n); strncpy(name, pTableName->z, pTableName->n);
strdequote(name); strdequote(name);
if (pParseCtx->db == NULL) { if (dbName == NULL) {
return buildInvalidOperationMsg(pMsgBuf, msg3); return buildInvalidOperationMsg(pMsgBuf, msg3);
} }
code = tNameSetDbName(pName, pParseCtx->acctId, pParseCtx->db, strlen(pParseCtx->db)); code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
code = buildInvalidOperationMsg(pMsgBuf, msg2); code = buildInvalidOperationMsg(pMsgBuf, msg2);
return code; return code;
...@@ -227,7 +226,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar ...@@ -227,7 +226,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool isStb) { static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool isStb) {
SParseContext* pBasicCtx = pCxt->pComCxt; SParseContext* pBasicCtx = pCxt->pComCxt;
SName name = {0}; SName name = {0};
createSName(&name, pTname, pBasicCtx, &pCxt->msg); createSName(&name, pTname, pBasicCtx->acctId, pBasicCtx->db, &pCxt->msg);
if (isStb) { if (isStb) {
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta)); CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
} else { } else {
...@@ -812,7 +811,7 @@ static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, S ...@@ -812,7 +811,7 @@ static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, S
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) { static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) {
SName name; SName name;
createSName(&name, pTbnameToken, pCxt->pComCxt, &pCxt->msg); createSName(&name, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&name, tbFName); tNameExtractFullName(&name, tbFName);
int32_t len = strlen(tbFName); int32_t len = strlen(tbFName);
...@@ -1009,7 +1008,6 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { ...@@ -1009,7 +1008,6 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
taosHashCleanup(pCxt->pVgroupsHashObj); taosHashCleanup(pCxt->pVgroupsHashObj);
destroyBlockHashmap(pCxt->pTableBlockHashObj); destroyBlockHashmap(pCxt->pTableBlockHashObj);
destroyBlockArrayList(pCxt->pTableDataBlocks);
destroyBlockArrayList(pCxt->pVgDataBlocks); destroyBlockArrayList(pCxt->pVgDataBlocks);
} }
...@@ -1103,15 +1101,16 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { ...@@ -1103,15 +1101,16 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
pCxt->pOutput->stmtCtx.tbUid = pCxt->pTableMeta->uid; pCxt->pOutput->stmtCtx.tbUid = pCxt->pTableMeta->uid;
pCxt->pOutput->stmtCtx.tbSuid = pCxt->pTableMeta->suid;
pCxt->pOutput->stmtCtx.tbType = pCxt->pTableMeta->tableType;
pCxt->pOutput->stmtCtx.pVgroupsHashObj = pCxt->pVgroupsHashObj; pCxt->pOutput->stmtCtx.pVgroupsHashObj = pCxt->pVgroupsHashObj;
pCxt->pOutput->stmtCtx.pTableBlockHashObj = pCxt->pTableBlockHashObj; pCxt->pOutput->stmtCtx.pTableBlockHashObj = pCxt->pTableBlockHashObj;
pCxt->pOutput->stmtCtx.pSubTableHashObj = pCxt->pSubTableHashObj; pCxt->pOutput->stmtCtx.tags = pCxt->tags;
pCxt->pOutput->stmtCtx.pTableDataBlocks = pCxt->pTableDataBlocks;
pCxt->pVgroupsHashObj = NULL; pCxt->pVgroupsHashObj = NULL;
pCxt->pTableBlockHashObj = NULL; pCxt->pTableBlockHashObj = NULL;
pCxt->pSubTableHashObj = NULL; memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pCxt->pTableDataBlocks = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1152,14 +1151,17 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { ...@@ -1152,14 +1151,17 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT); TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
} }
*pQuery = taosMemoryCalloc(1, sizeof(SQuery));
if (NULL == *pQuery) { if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; *pQuery = taosMemoryCalloc(1, sizeof(SQuery));
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false;
(*pQuery)->msgType = TDMT_VND_SUBMIT;
(*pQuery)->pRoot = (SNode*)context.pOutput;
} }
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false;
(*pQuery)->msgType = TDMT_VND_SUBMIT;
(*pQuery)->pRoot = (SNode*)context.pOutput;
context.pOutput->payloadType = PAYLOAD_TYPE_KV; context.pOutput->payloadType = PAYLOAD_TYPE_KV;
int32_t code = skipInsertInto(&context); int32_t code = skipInsertInto(&context);
......
...@@ -51,6 +51,33 @@ int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery) { ...@@ -51,6 +51,33 @@ int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery) {
return code; return code;
} }
int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen) {
SMsgBuf msg = {.buf = msgBuf, .len =msgBufLen};
SToken sToken;
int32_t code = 0;
char *tbName = NULL;
NEXT_TOKEN(pTableName, sToken);
if (sToken.n == 0) {
return buildInvalidOperationMsg(&msg, "empty table name");
}
code = createSName(pName, &sToken, acctId, dbName, &msg);
if (code) {
return code;
}
NEXT_TOKEN(pTableName, sToken);
if (SToken.n > 0) {
return buildInvalidOperationMsg(&msg, "table name format is wrong");
}
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtData(SQuery* pQuery, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { int32_t qBindStmtData(SQuery* pQuery, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx; SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
...@@ -122,10 +149,8 @@ int32_t qBuildStmtOutput(SQuery* pQuery) { ...@@ -122,10 +149,8 @@ int32_t qBuildStmtOutput(SQuery* pQuery) {
SStmtDataCtx *pCtx = &modifyNode->stmtCtx; SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
int32_t code = 0; int32_t code = 0;
SInsertParseContext insertCtx = { SInsertParseContext insertCtx = {
.pVgroupsHashObj = pCtx->pVgroupsHashObj; .pVgroupsHashObj = pCtx->pVgroupsHashObj,
.pTableBlockHashObj = pCtx->pTableBlockHashObj; .pTableBlockHashObj = pCtx->pTableBlockHashObj,
.pSubTableHashObj = pCtx->pSubTableHashObj;
.pTableDataBlocks = pCtx->pTableDataBlocks;
}; };
// merge according to vgId // merge according to vgId
...@@ -142,6 +167,66 @@ _return: ...@@ -142,6 +167,66 @@ _return:
return code; return code;
} }
int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_t *fieldNum, TAOS_FIELD** fields) {
*fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
if (NULL == *fields) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1];
strcpy((*fields)[i].name, pTagSchema->name);
(*fields)[i].type = pTagSchema->type;
(*fields)[i].bytes = pTagSchema->bytes;
}
*fieldNum = boundInfo->numOfBound;
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtTagFields(SQuery* pQuery, int32_t *fieldNum, TAOS_FIELD** fields) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid));
if (NULL == pDataBlock) {
return TSDB_CODE_QRY_APP_ERROR;
}
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
if (pCtx->tags.numOfBound <= 0) {
*fieldNum = 0;
*fields = NULL;
return TSDB_CODE_SUCCESS;
}
CHECK_CODE(buildBoundFields(&pCtx->tags, pSchema, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtColFields(SQuery* pQuery, int32_t *fieldNum, TAOS_FIELD** fields) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid));
if (NULL == pDataBlock) {
return TSDB_CODE_QRY_APP_ERROR;
}
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
if (pCtx->tags.numOfBound <= 0) {
*fieldNum = 0;
*fields = NULL;
return TSDB_CODE_SUCCESS;
}
CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
void qDestroyQuery(SQuery* pQueryNode) { void qDestroyQuery(SQuery* pQueryNode) {
if (NULL == pQueryNode) { if (NULL == pQueryNode) {
......
...@@ -140,6 +140,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range") ...@@ -140,6 +140,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_STATUS_ERROR, "Stmt API usage error") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_STATUS_ERROR, "Stmt API usage error")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause")
// mnode-common // mnode-common
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error") TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册