diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 4e7fff06b6bd60f015764745d8d093046e812fd6..a1a9155d16d1a46b608e8ae0fe6fe999babc0e21 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -86,26 +86,32 @@ typedef struct STscObj { SAppInstInfo *pAppInfo; } STscObj; -typedef struct SClientResultInfo { - const char *pMsg; +typedef struct SReqResultInfo { + const char *pRspMsg; const char *pData; TAOS_FIELD *fields; - int32_t numOfCols; - int32_t numOfRows; - int32_t current; + uint32_t numOfCols; + int32_t *length; TAOS_ROW row; char **pCol; -} SClientResultInfo; -typedef struct SReqBody { - tsem_t rspSem; // not used now - void* fp; - void* param; - int32_t paramLen; - int64_t execId; // showId/queryId - SClientResultInfo* pResInfo; -} SRequestBody; + uint32_t numOfRows; + uint32_t current; +} SReqResultInfo; + +typedef struct SReqMsg { + void *pMsg; + uint32_t len; +} SReqMsgInfo; + +typedef struct SRequestSendRecvBody { + tsem_t rspSem; // not used now + void* fp; + int64_t execId; // showId/queryId + SReqMsgInfo requestMsg; + SReqResultInfo resInfo; +} SRequestSendRecvBody; #define ERROR_MSG_BUF_DEFAULT_SIZE 512 @@ -115,7 +121,7 @@ typedef struct SRequestObj { STscObj *pTscObj; SQueryExecMetric metric; char *sqlstr; // sql string - SRequestBody body; + SRequestSendRecvBody body; int64_t self; char *msgBuf; int32_t code; @@ -123,11 +129,10 @@ typedef struct SRequestObj { } SRequestObj; typedef struct SRequestMsgBody { - int32_t msgType; - void *pData; - int32_t msgLen; - uint64_t requestId; - uint64_t requestObjRefId; + int32_t msgType; + SReqMsgInfo msgInfo; + uint64_t requestId; + uint64_t requestObjRefId; } SRequestMsgBody; extern SAppInfo appInfo; @@ -158,7 +163,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); void* doFetchRow(SRequestObj* pRequest); -void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); +void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 9532789ca0e14d7871545b0bb1c54747ddec3608..bab1f9cc9b0aa8ece40d6c3e8a5dccbe677aae3e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -155,8 +155,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { code = qParseQuerySql(pRequest->sqlstr, sqlLen, pRequest->requestId, &type, &output, &outputLen, pRequest->msgBuf, ERROR_MSG_BUF_DEFAULT_SIZE); if (type == TSDB_SQL_CREATE_USER || type == TSDB_SQL_SHOW || type == TSDB_SQL_DROP_USER || type == TSDB_SQL_DROP_ACCT || type == TSDB_SQL_CREATE_DB || type == TSDB_SQL_CREATE_ACCT) { pRequest->type = type; - pRequest->body.param = output; - pRequest->body.paramLen = outputLen; + pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = output, .len = outputLen}; SRequestMsgBody body = {0}; buildRequestMsgFp[type](pRequest, &body); @@ -165,6 +164,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); tsem_wait(&pRequest->body.rspSem); + + destroyRequestMsgBody(&body); } else { assert(0); @@ -255,7 +256,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT; - pMsgBody->msgLen = sizeof(SConnectMsg); + pMsgBody->msgInfo.len = sizeof(SConnectMsg); pMsgBody->requestObjRefId = pRequest->self; SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg)); @@ -279,28 +280,28 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) pConnect->startTime = htobe64(appInfo.startTime); tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); - pMsgBody->pData = pConnect; + pMsgBody->msgInfo.pMsg = pConnect; return 0; } static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) { assert(pMsgBody != NULL); - tfree(pMsgBody->pData); + tfree(pMsgBody->msgInfo.pMsg); } int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) { - char *pMsg = rpcMallocCont(pBody->msgLen); + char *pMsg = rpcMallocCont(pBody->msgInfo.len); if (NULL == pMsg) { tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return -1; } - memcpy(pMsg, pBody->pData, pBody->msgLen); + memcpy(pMsg, pBody->msgInfo.pMsg, pBody->msgInfo.len); SRpcMsg rpcMsg = { .msgType = pBody->msgType, .pCont = pMsg, - .contLen = pBody->msgLen, + .contLen = pBody->msgInfo.len, .ahandle = (void*) pBody->requestObjRefId, .handle = NULL, .code = 0 @@ -388,7 +389,7 @@ TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, c void* doFetchRow(SRequestObj* pRequest) { assert(pRequest != NULL); - SClientResultInfo* pResultInfo = pRequest->body.pResInfo; + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { pRequest->type = TSDB_SQL_RETRIEVE_MNODE; @@ -421,7 +422,7 @@ void* doFetchRow(SRequestObj* pRequest) { return pResultInfo->row; } -void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { +void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); if (numOfRows == 0) { return; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 21e632db8d5d3e5a5cb1885e85a0c50e12e9f3e7..8a75799ed78323c2cd63e97456e01858be60f30b 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -115,12 +115,7 @@ int taos_field_count(TAOS_RES *res) { } SRequestObj* pRequest = (SRequestObj*) res; - - SClientResultInfo* pResInfo = pRequest->body.pResInfo; - if (pResInfo == NULL) { - return 0; - } - + SReqResultInfo* pResInfo = &pRequest->body.resInfo; return pResInfo->numOfCols; } @@ -133,7 +128,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { return NULL; } - SClientResultInfo* pResInfo = ((SRequestObj*) res)->body.pResInfo; + SReqResultInfo* pResInfo = &(((SRequestObj*) res)->body.resInfo); return pResInfo->fields; } @@ -248,7 +243,7 @@ int* taos_fetch_lengths(TAOS_RES *res) { return NULL; } - return ((SRequestObj*) res)->body.pResInfo->length; + return ((SRequestObj*) res)->body.resInfo.length; } const char *taos_data_type(int type) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 247468274e96cad5ad759b616aa6a41a3194c926..e70dcc63b38f988c055253bed0436aba0834bf4e 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -2793,7 +2793,7 @@ static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) { int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT; - pMsgBody->msgLen = sizeof(SConnectMsg); + pMsgBody->msgInfo.len = sizeof(SConnectMsg); pMsgBody->requestObjRefId = pRequest->self; SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg)); @@ -2817,7 +2817,7 @@ int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { pConnect->startTime = htobe64(appInfo.startTime); tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); - pMsgBody->pData = pConnect; + pMsgBody->msgInfo.pMsg = pConnect; return 0; } @@ -2858,17 +2858,14 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { pTscObj->pAppInfo->clusterId = pConnect->clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo)); - pRequest->body.pResInfo->pMsg = pMsg; - + pRequest->body.resInfo.pRspMsg = pMsg; tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns); return 0; } int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { pMsgBody->requestObjRefId = pRequest->self; - pMsgBody->msgLen = pRequest->body.paramLen; - pMsgBody->pData = pRequest->body.param; + pMsgBody->msgInfo = pRequest->body.requestMsg; switch(pRequest->type) { case TSDB_SQL_CREATE_USER: @@ -2886,7 +2883,7 @@ int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { case TSDB_SQL_CREATE_DB: { pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_DB; - SCreateDbMsg* pCreateMsg = pRequest->body.param; + SCreateDbMsg* pCreateMsg = pRequest->body.requestMsg.pMsg; SName name = {0}; int32_t ret = tNameSetDbName(&name, pRequest->pTscObj->acctId, pCreateMsg->db, strnlen(pCreateMsg->db, tListLen(pCreateMsg->db))); if (ret != TSDB_CODE_SUCCESS) { @@ -2925,12 +2922,8 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) pFields[i].bytes = pSchema[i].bytes; } - if (pRequest->body.pResInfo == NULL) { - pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo)); - } - - pRequest->body.pResInfo->pMsg = pMsg; - SClientResultInfo* pResInfo = pRequest->body.pResInfo; + pRequest->body.resInfo.pRspMsg = pMsg; + SReqResultInfo* pResInfo = &pRequest->body.resInfo; pResInfo->fields = pFields; pResInfo->numOfCols = pMetaMsg->numOfColumns; @@ -2944,27 +2937,27 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) int buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { pMsgBody->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; - pMsgBody->msgLen = sizeof(SRetrieveTableMsg); + pMsgBody->msgInfo.len = sizeof(SRetrieveTableMsg); pMsgBody->requestObjRefId = pRequest->self; SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg)); pRetrieveMsg->showId = htonl(pRequest->body.execId); - pMsgBody->pData = pRetrieveMsg; + pMsgBody->msgInfo.pMsg = pRetrieveMsg; return TSDB_CODE_SUCCESS; } int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { assert(msgLen >= sizeof(SRetrieveTableRsp)); - tfree(pRequest->body.pResInfo->pMsg); - pRequest->body.pResInfo->pMsg = pMsg; + tfree(pRequest->body.resInfo.pRspMsg); + pRequest->body.resInfo.pRspMsg = pMsg; SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg; pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); pRetrieve->precision = htons(pRetrieve->precision); - SClientResultInfo* pResInfo = pRequest->body.pResInfo; + SReqResultInfo* pResInfo = &pRequest->body.resInfo; pResInfo->numOfRows = pRetrieve->numOfRows; pResInfo->pData = pRetrieve->data; // todo fix this in async model diff --git a/source/client/src/tscEnv.c b/source/client/src/tscEnv.c index 76c37ca2f1734e0e79dc025f9678df48c8fb8e24..182e330df7f9419c09ec1f9643e77df830a9a786 100644 --- a/source/client/src/tscEnv.c +++ b/source/client/src/tscEnv.c @@ -170,7 +170,7 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty pRequest->type = type; pRequest->pTscObj = pObj; pRequest->body.fp = fp; - pRequest->body.param = param; +// pRequest->body.requestMsg. = param; pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); tsem_init(&pRequest->body.rspSem, 0, 0); @@ -188,10 +188,7 @@ static void doDestroyRequest(void* p) { tfree(pRequest->sqlstr); tfree(pRequest->pInfo); - if (pRequest->body.pResInfo != NULL) { - tfree(pRequest->body.pResInfo->pMsg); - tfree(pRequest->body.pResInfo); - } + tfree(pRequest->body.resInfo.pRspMsg); deregisterRequest(pRequest); tfree(pRequest); diff --git a/source/libs/parser/inc/astToMsg.h b/source/libs/parser/inc/astToMsg.h index 32906f7800de1e724bc2ee1e43511f051ab93eeb..1771bdc0edbeeb649f2c02e583e6d525df20a894 100644 --- a/source/libs/parser/inc/astToMsg.h +++ b/source/libs/parser/inc/astToMsg.h @@ -5,6 +5,7 @@ #include "taosmsg.h" SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); +SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen); SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, char* msgBuf, int32_t msgLen); diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index 3f9d86737fb1fb76e2b2869bcb4cb9cf521e5c96..42ffbf3c064e60faf148022d2396b06507e4d055 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -24,6 +24,50 @@ SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in return pMsg; } +SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen) { + SCreateAcctMsg* pMsg = (SCreateAcctMsg*)calloc(1, sizeof(SCreateAcctMsg)); + if (pMsg == NULL) { + // tscError("0x%" PRIx64 " failed to malloc for query msg", id); + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + + SCreateAcctMsg *pCreateMsg = (SCreateAcctMsg *) calloc(1, sizeof(SCreateAcctMsg)); + + SToken *pName = &pInfo->pMiscInfo->user.user; + SToken *pPwd = &pInfo->pMiscInfo->user.passwd; + + strncpy(pCreateMsg->user, pName->z, pName->n); + strncpy(pCreateMsg->pass, pPwd->z, pPwd->n); + + SCreateAcctInfo *pAcctOpt = &pInfo->pMiscInfo->acctOpt; + + pCreateMsg->maxUsers = htonl(pAcctOpt->maxUsers); + pCreateMsg->maxDbs = htonl(pAcctOpt->maxDbs); + pCreateMsg->maxTimeSeries = htonl(pAcctOpt->maxTimeSeries); + pCreateMsg->maxStreams = htonl(pAcctOpt->maxStreams); +// pCreateMsg->maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond); + pCreateMsg->maxStorage = htobe64(pAcctOpt->maxStorage); +// pCreateMsg->maxQueryTime = htobe64(pAcctOpt->maxQueryTime); +// pCreateMsg->maxConnections = htonl(pAcctOpt->maxConnections); + + if (pAcctOpt->stat.n == 0) { + pCreateMsg->accessState = -1; + } else { + if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) { + pCreateMsg->accessState = TSDB_VN_READ_ACCCESS; + } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) { + pCreateMsg->accessState = TSDB_VN_WRITE_ACCCESS; + } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) { + pCreateMsg->accessState = TSDB_VN_ALL_ACCCESS; + } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) { + pCreateMsg->accessState = 0; + } + } + + *outputLen = sizeof(SCreateAcctMsg); + return pMsg; +} SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t *msgLen, int64_t id, char* msgBuf, int32_t msgBufLen) { SToken* pName = taosArrayGet(pInfo->pMiscInfo->a, 0); if (pName->n >= TSDB_USER_LEN) { diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index ba96c4a79638aaa5bdf26c68f46dc715daf181e3..accc786bf6787ba52b72721a5da143c4a3d46e86 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -4228,6 +4228,42 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, in break; } + case TSDB_SQL_CREATE_ACCT: + case TSDB_SQL_ALTER_ACCT: { + const char* msg1 = "invalid state option, available options[no, r, w, all]"; + const char* msg2 = "invalid user/account name"; + const char* msg3 = "name too long"; + + SToken* pName = &pInfo->pMiscInfo->user.user; + SToken* pPwd = &pInfo->pMiscInfo->user.passwd; + + if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + + if (pName->n >= TSDB_USER_LEN) { + return buildInvalidOperationMsg(pMsgBuf, msg3); + } + + if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) { + return buildInvalidOperationMsg(pMsgBuf, msg2); + } + + SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt; + if (pAcctOpt->stat.n > 0) { + if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) { + } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) { + } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) { + } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) { + } else { + return buildInvalidOperationMsg(pMsgBuf, msg1); + } + } + + *output = buildAcctManipulationMsg(pInfo, outputLen, id, msgBuf, msgBufLen); + break; + } + case TSDB_SQL_DROP_ACCT: case TSDB_SQL_DROP_USER: { *output = buildDropUserMsg(pInfo, outputLen, id, msgBuf, msgBufLen);