提交 c41a4bd7 编写于 作者: X Xiaoyu Wang

feat: subplan adds 'user' field

上级 9beb0fad
...@@ -458,6 +458,7 @@ typedef struct SSubplan { ...@@ -458,6 +458,7 @@ typedef struct SSubplan {
int32_t msgType; // message type for subplan, used to denote the send message type to vnode. int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner. int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char user[TSDB_USER_LEN];
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
SQueryNodeStat execNodeStat; // only for scan subplan SQueryNodeStat execNodeStat; // only for scan subplan
SNodeList* pChildren; // the datasource subplan,from which to fetch the result SNodeList* pChildren; // the datasource subplan,from which to fetch the result
......
...@@ -24,18 +24,19 @@ extern "C" { ...@@ -24,18 +24,19 @@ extern "C" {
#include "taos.h" #include "taos.h"
typedef struct SPlanContext { typedef struct SPlanContext {
uint64_t queryId; uint64_t queryId;
int32_t acctId; int32_t acctId;
SEpSet mgmtEpSet; SEpSet mgmtEpSet;
SNode* pAstRoot; SNode* pAstRoot;
bool topicQuery; bool topicQuery;
bool streamQuery; bool streamQuery;
bool rSmaQuery; bool rSmaQuery;
bool showRewrite; bool showRewrite;
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
char* pMsg; char* pMsg;
int32_t msgLen; int32_t msgLen;
const char* pUser;
} SPlanContext; } SPlanContext;
// Create the physical plan for the query, according to the AST. // Create the physical plan for the query, according to the AST.
......
...@@ -59,7 +59,7 @@ static STscObj* taosConnectImpl(const char* user, const char* auth, const char* ...@@ -59,7 +59,7 @@ static STscObj* taosConnectImpl(const char* user, const char* auth, const char*
SAppInstInfo* pAppInfo, int connType); SAppInstInfo* pAppInfo, int connType);
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType) { uint16_t port, int connType) {
if (taos_init() != TSDB_CODE_SUCCESS) { if (taos_init() != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
...@@ -327,8 +327,8 @@ bool qnodeRequired(SRequestObj* pRequest) { ...@@ -327,8 +327,8 @@ bool qnodeRequired(SRequestObj* pRequest) {
} }
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo; SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
bool required = false; bool required = false;
taosThreadMutexLock(&pInfo->qnodeMutex); taosThreadMutexLock(&pInfo->qnodeMutex);
required = (NULL == pInfo->pQnodeList); required = (NULL == pInfo->pQnodeList);
taosThreadMutexUnlock(&pInfo->qnodeMutex); taosThreadMutexUnlock(&pInfo->qnodeMutex);
...@@ -376,7 +376,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra ...@@ -376,7 +376,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.pAstRoot = pQuery->pRoot, .pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite, .showRewrite = pQuery->showRewrite,
.pMsg = pRequest->msgBuf, .pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE}; .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pUser = pRequest->pTscObj->user};
return qCreateQueryPlan(&cxt, pPlan, pNodeList); return qCreateQueryPlan(&cxt, pPlan, pNodeList);
} }
...@@ -433,11 +434,11 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr ...@@ -433,11 +434,11 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr
} }
for (int32_t j = 0; j < vgNum; ++j) { for (int32_t j = 0; j < vgNum; ++j) {
SVgroupInfo* pInfo = taosArrayGet(pVg, j); SVgroupInfo* pInfo = taosArrayGet(pVg, j);
SQueryNodeLoad load = {0}; SQueryNodeLoad load = {0};
load.addr.nodeId = pInfo->vgId; load.addr.nodeId = pInfo->vgId;
load.addr.epSet = pInfo->epSet; load.addr.epSet = pInfo->epSet;
taosArrayPush(nodeList, &load); taosArrayPush(nodeList, &load);
} }
} }
...@@ -495,17 +496,16 @@ _return: ...@@ -495,17 +496,16 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData *pResultMeta) {
SArray* pDbVgList = NULL; SArray* pDbVgList = NULL;
SArray* pQnodeList = NULL; SArray* pQnodeList = NULL;
int32_t code = 0; int32_t code = 0;
switch (tsQueryPolicy) { switch (tsQueryPolicy) {
case QUERY_POLICY_VNODE: { case QUERY_POLICY_VNODE: {
if (pResultMeta) { if (pResultMeta) {
pDbVgList = taosArrayInit(4, POINTER_BYTES); pDbVgList = taosArrayInit(4, POINTER_BYTES);
int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup); int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
for (int32_t i = 0; i < dbNum; ++i) { for (int32_t i = 0; i < dbNum; ++i) {
SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i); SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
...@@ -514,9 +514,9 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray ...@@ -514,9 +514,9 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
} }
taosArrayPush(pDbVgList, &pRes->pRes); taosArrayPush(pDbVgList, &pRes->pRes);
} }
} }
code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList); code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
break; break;
} }
...@@ -537,7 +537,7 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray ...@@ -537,7 +537,7 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
} }
taosThreadMutexUnlock(&pInst->qnodeMutex); taosThreadMutexUnlock(&pInst->qnodeMutex);
} }
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
break; break;
} }
...@@ -548,7 +548,7 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray ...@@ -548,7 +548,7 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
taosArrayDestroy(pDbVgList); taosArrayDestroy(pDbVgList);
taosArrayDestroy(pQnodeList); taosArrayDestroy(pQnodeList);
return code; return code;
} }
...@@ -556,43 +556,43 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* ...@@ -556,43 +556,43 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray*
SArray* pDbVgList = NULL; SArray* pDbVgList = NULL;
SArray* pQnodeList = NULL; SArray* pQnodeList = NULL;
int32_t code = 0; int32_t code = 0;
switch (tsQueryPolicy) { switch (tsQueryPolicy) {
case QUERY_POLICY_VNODE: { case QUERY_POLICY_VNODE: {
int32_t dbNum = taosArrayGetSize(pRequest->dbList); int32_t dbNum = taosArrayGetSize(pRequest->dbList);
if (dbNum > 0) { if (dbNum > 0) {
SCatalog* pCtg = NULL; SCatalog* pCtg = NULL;
SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo; SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
code = catalogGetHandle(pInst->clusterId, &pCtg); code = catalogGetHandle(pInst->clusterId, &pCtg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _return; goto _return;
} }
pDbVgList = taosArrayInit(dbNum, POINTER_BYTES); pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
SArray* pVgList = NULL; SArray* pVgList = NULL;
for (int32_t i = 0; i < dbNum; ++i) { for (int32_t i = 0; i < dbNum; ++i) {
char* dbFName = taosArrayGet(pRequest->dbList, i); char* dbFName = taosArrayGet(pRequest->dbList, i);
SRequestConnInfo conn = {.pTrans = pInst->pTransporter, SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pInst->mgmtEp)}; .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
code = catalogGetDBVgInfo(pCtg, &conn, dbFName, &pVgList); code = catalogGetDBVgInfo(pCtg, &conn, dbFName, &pVgList);
if (code) { if (code) {
goto _return; goto _return;
} }
taosArrayPush(pDbVgList, &pVgList); taosArrayPush(pDbVgList, &pVgList);
} }
} }
code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList); code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
break; break;
} }
case QUERY_POLICY_HYBRID: case QUERY_POLICY_HYBRID:
case QUERY_POLICY_QNODE: { case QUERY_POLICY_QNODE: {
getQnodeList(pRequest, &pQnodeList); getQnodeList(pRequest, &pQnodeList);
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
break; break;
} }
...@@ -605,11 +605,10 @@ _return: ...@@ -605,11 +605,10 @@ _return:
taosArrayDestroy(pDbVgList); taosArrayDestroy(pDbVgList);
taosArrayDestroy(pQnodeList); taosArrayDestroy(pQnodeList);
return code; return code;
} }
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
tsem_init(&schdRspSem, 0, 0); tsem_init(&schdRspSem, 0, 0);
...@@ -833,8 +832,8 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { ...@@ -833,8 +832,8 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
} }
} }
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
pRequest->self, code, tstrerror(code), pRequest->requestId); tstrerror(code), pRequest->requestId);
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) { if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) {
...@@ -880,7 +879,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue ...@@ -880,7 +879,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL; SArray* pNodeList = NULL;
buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList); buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList); code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
taosArrayDestroy(pNodeList); taosArrayDestroy(pNodeList);
} }
...@@ -935,7 +934,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool val ...@@ -935,7 +934,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool val
return launchQueryImpl(pRequest, pQuery, false, NULL); return launchQueryImpl(pRequest, pQuery, false, NULL);
} }
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultMeta) { void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta) {
int32_t code = 0; int32_t code = 0;
switch (pQuery->execMode) { switch (pQuery->execMode) {
...@@ -968,7 +967,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultM ...@@ -968,7 +967,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultM
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL; SArray* pNodeList = NULL;
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta); buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
SRequestConnInfo conn = { SRequestConnInfo conn = {
.pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
SSchedulerReq req = {.pConn = &conn, SSchedulerReq req = {.pConn = &conn,
...@@ -1328,7 +1327,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons ...@@ -1328,7 +1327,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
if (pObj) { if (pObj) {
return pObj->id; return pObj->id;
} }
return NULL; return NULL;
} }
...@@ -1500,10 +1499,10 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -1500,10 +1499,10 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows){ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
char* p = (char*)pResultInfo->pData; char* p = (char*)pResultInfo->pData;
int32_t len = sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)); int32_t len = sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
int32_t* colLength = (int32_t*)(p + len); int32_t* colLength = (int32_t*)(p + len);
len += sizeof(int32_t) * numOfCols; len += sizeof(int32_t) * numOfCols;
...@@ -1513,7 +1512,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i ...@@ -1513,7 +1512,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) { if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
int32_t* offset = (int32_t*)pStart; int32_t* offset = (int32_t*)pStart;
int32_t lenTmp = numOfRows * sizeof(int32_t); int32_t lenTmp = numOfRows * sizeof(int32_t);
len += lenTmp; len += lenTmp;
pStart += lenTmp; pStart += lenTmp;
...@@ -1538,7 +1537,6 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i ...@@ -1538,7 +1537,6 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
} else { } else {
ASSERT(0); ASSERT(0);
} }
} }
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
int32_t lenTmp = numOfRows * sizeof(int32_t); int32_t lenTmp = numOfRows * sizeof(int32_t);
...@@ -1562,13 +1560,13 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int ...@@ -1562,13 +1560,13 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
break; break;
} }
} }
if(!needConvert) return TSDB_CODE_SUCCESS; if (!needConvert) return TSDB_CODE_SUCCESS;
char* p = (char*)pResultInfo->pData; char* p = (char*)pResultInfo->pData;
int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows); int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows);
pResultInfo->convertJson = taosMemoryCalloc(1, dataLen); pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
if(pResultInfo->convertJson == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (pResultInfo->convertJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
char* p1 = pResultInfo->convertJson; char* p1 = pResultInfo->convertJson;
int32_t len = sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)); int32_t len = sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
...@@ -1637,7 +1635,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int ...@@ -1637,7 +1635,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
ASSERT(0); ASSERT(0);
} }
offset1[j]= len; offset1[j] = len;
memcpy(pStart1 + len, dst, varDataTLen(dst)); memcpy(pStart1 + len, dst, varDataTLen(dst));
len += varDataTLen(dst); len += varDataTLen(dst);
} }
...@@ -1655,7 +1653,6 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int ...@@ -1655,7 +1653,6 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
pStart += len; pStart += len;
pStart1 += len; pStart1 += len;
memcpy(pStart1, pStart, colLen); memcpy(pStart1, pStart, colLen);
} }
pStart += colLen; pStart += colLen;
pStart1 += colLen1; pStart1 += colLen1;
...@@ -1723,7 +1720,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 ...@@ -1723,7 +1720,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
pStart += colLength[i]; pStart += colLength[i];
} }
if(convertUcs4){ if (convertUcs4) {
code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength); code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
} }
...@@ -1840,17 +1837,18 @@ _OVER: ...@@ -1840,17 +1837,18 @@ _OVER:
return code; return code;
} }
int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str, int32_t acctId, char* db) { int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
int32_t acctId, char* db) {
SName name; SName name;
if (len1 <= 0) { if (len1 <= 0) {
return -1; return -1;
} }
const char *dbName = db; const char* dbName = db;
const char *tbName = NULL; const char* tbName = NULL;
int32_t dbLen = 0; int32_t dbLen = 0;
int32_t tbLen = 0; int32_t tbLen = 0;
if (len2 > 0) { if (len2 > 0) {
dbName = str + pos1; dbName = str + pos1;
dbLen = len1; dbLen = len1;
...@@ -1861,7 +1859,7 @@ int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, i ...@@ -1861,7 +1859,7 @@ int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, i
tbName = str + pos1; tbName = str + pos1;
tbLen = len1; tbLen = len1;
} }
if (tNameSetDbName(&name, acctId, dbName, dbLen)) { if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
return -1; return -1;
} }
...@@ -1881,18 +1879,18 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, ...@@ -1881,18 +1879,18 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
} }
bool inEscape = false; bool inEscape = false;
int32_t code = 0; int32_t code = 0;
int32_t vIdx = 0; int32_t vIdx = 0;
int32_t vPos[2]; int32_t vPos[2];
int32_t vLen[2]; int32_t vLen[2];
memset(vPos, -1, sizeof(vPos)); memset(vPos, -1, sizeof(vPos));
memset(vLen, 0, sizeof(vLen)); memset(vLen, 0, sizeof(vLen));
for (int32_t i = 0; ; ++i) { for (int32_t i = 0;; ++i) {
if (0 == *(tbList + i)) { if (0 == *(tbList + i)) {
if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) { if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
vLen[vIdx] = i - vPos[vIdx]; vLen[vIdx] = i - vPos[vIdx];
...@@ -1905,7 +1903,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, ...@@ -1905,7 +1903,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
break; break;
} }
if ('`' == *(tbList + i)) { if ('`' == *(tbList + i)) {
inEscape = !inEscape; inEscape = !inEscape;
if (!inEscape) { if (!inEscape) {
...@@ -1952,7 +1950,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, ...@@ -1952,7 +1950,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
if (code) { if (code) {
goto _return; goto _return;
} }
memset(vPos, -1, sizeof(vPos)); memset(vPos, -1, sizeof(vPos));
memset(vLen, 0, sizeof(vLen)); memset(vLen, 0, sizeof(vLen));
vIdx = 0; vIdx = 0;
...@@ -1966,8 +1964,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, ...@@ -1966,8 +1964,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
continue; continue;
} }
if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
('0' <= *(tbList + i) && '9' >= *(tbList + i))) { ('0' <= *(tbList + i) && '9' >= *(tbList + i))) {
if (vLen[vIdx] > 0) { if (vLen[vIdx] > 0) {
goto _return; goto _return;
...@@ -1989,32 +1986,31 @@ _return: ...@@ -1989,32 +1986,31 @@ _return:
taosArrayDestroy(*pReq); taosArrayDestroy(*pReq);
*pReq = NULL; *pReq = NULL;
return terrno; return terrno;
} }
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) { void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
SSyncQueryParam *pParam = param; SSyncQueryParam* pParam = param;
pParam->pRequest->code = code; pParam->pRequest->code = code;
tsem_post(&pParam->sem); tsem_post(&pParam->sem);
} }
void syncQueryFn(void* param, void* res, int32_t code) {
void syncQueryFn(void *param, void *res, int32_t code) { SSyncQueryParam* pParam = param;
SSyncQueryParam *pParam = param;
pParam->pRequest = res; pParam->pRequest = res;
pParam->pRequest->code = code; pParam->pRequest->code = code;
tsem_post(&pParam->sem); tsem_post(&pParam->sem);
} }
void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, bool validateOnly) { void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos); STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
if (pTscObj == NULL || sql == NULL || NULL == fp) { if (pTscObj == NULL || sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
if (pTscObj) { if (pTscObj) {
releaseTscObj(*(int64_t *)taos); releaseTscObj(*(int64_t*)taos);
} else { } else {
terrno = TSDB_CODE_TSC_DISCONNECTED; terrno = TSDB_CODE_TSC_DISCONNECTED;
} }
...@@ -2031,7 +2027,7 @@ void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void ...@@ -2031,7 +2027,7 @@ void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void
return; return;
} }
SRequestObj *pRequest = NULL; SRequestObj* pRequest = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
...@@ -2045,45 +2041,41 @@ void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void ...@@ -2045,45 +2041,41 @@ void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void
doAsyncQuery(pRequest, false); doAsyncQuery(pRequest, false);
} }
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
TAOS_RES *taosQueryImpl(TAOS *taos, const char *sql, bool validateOnly) {
if (NULL == taos) { if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED; terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL; return NULL;
} }
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos); STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
if (pTscObj == NULL || sql == NULL) { if (pTscObj == NULL || sql == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED; terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL; return NULL;
} }
#if SYNC_ON_TOP_OF_ASYNC #if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam *param = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0); tsem_init(&param->sem, 0, 0);
taosAsyncQueryImpl(taos, sql, syncQueryFn, param, validateOnly); taosAsyncQueryImpl(taos, sql, syncQueryFn, param, validateOnly);
tsem_wait(&param->sem); tsem_wait(&param->sem);
releaseTscObj(*(int64_t *)taos); releaseTscObj(*(int64_t*)taos);
return param->pRequest; return param->pRequest;
#else #else
size_t sqlLen = strlen(sql); size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
releaseTscObj(*(int64_t *)taos); releaseTscObj(*(int64_t*)taos);
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL; return NULL;
} }
TAOS_RES *pRes = execQuery(pTscObj, sql, sqlLen, validateOnly); TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen, validateOnly);
releaseTscObj(*(int64_t *)taos); releaseTscObj(*(int64_t*)taos);
return pRes; return pRes;
#endif #endif
} }
...@@ -2294,6 +2294,7 @@ static const char* jkSubplanType = "SubplanType"; ...@@ -2294,6 +2294,7 @@ static const char* jkSubplanType = "SubplanType";
static const char* jkSubplanMsgType = "MsgType"; static const char* jkSubplanMsgType = "MsgType";
static const char* jkSubplanLevel = "Level"; static const char* jkSubplanLevel = "Level";
static const char* jkSubplanDbFName = "DbFName"; static const char* jkSubplanDbFName = "DbFName";
static const char* jkSubplanUser = "User";
static const char* jkSubplanNodeAddr = "NodeAddr"; static const char* jkSubplanNodeAddr = "NodeAddr";
static const char* jkSubplanRootNode = "RootNode"; static const char* jkSubplanRootNode = "RootNode";
static const char* jkSubplanDataSink = "DataSink"; static const char* jkSubplanDataSink = "DataSink";
...@@ -2316,6 +2317,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) { ...@@ -2316,6 +2317,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkSubplanDbFName, pNode->dbFName); code = tjsonAddStringToObject(pJson, jkSubplanDbFName, pNode->dbFName);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkSubplanUser, pNode->user);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSubplanNodeAddr, queryNodeAddrToJson, &pNode->execNode); code = tjsonAddObject(pJson, jkSubplanNodeAddr, queryNodeAddrToJson, &pNode->execNode);
} }
...@@ -2352,6 +2356,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) { ...@@ -2352,6 +2356,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkSubplanDbFName, pNode->dbFName); code = tjsonGetStringValue(pJson, jkSubplanDbFName, pNode->dbFName);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkSubplanUser, pNode->user);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkSubplanNodeAddr, jsonToQueryNodeAddr, &pNode->execNode); code = tjsonToObject(pJson, jkSubplanNodeAddr, jsonToQueryNodeAddr, &pNode->execNode);
} }
......
...@@ -1453,6 +1453,7 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl ...@@ -1453,6 +1453,7 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
pSubplan->id = pLogicSubplan->id; pSubplan->id = pLogicSubplan->id;
pSubplan->subplanType = pLogicSubplan->subplanType; pSubplan->subplanType = pLogicSubplan->subplanType;
pSubplan->level = pLogicSubplan->level; pSubplan->level = pLogicSubplan->level;
strcpy(pSubplan->user, pCxt->pPlanCxt->pUser);
return pSubplan; return pSubplan;
} }
......
...@@ -85,8 +85,9 @@ class PlannerTestBaseImpl { ...@@ -85,8 +85,9 @@ class PlannerTestBaseImpl {
public: public:
PlannerTestBaseImpl() : sqlNo_(0) {} PlannerTestBaseImpl() : sqlNo_(0) {}
void useDb(const string& acctId, const string& db) { void useDb(const string& user, const string& db) {
caseEnv_.acctId_ = acctId; caseEnv_.acctId_ = 0;
caseEnv_.user_ = user;
caseEnv_.db_ = db; caseEnv_.db_ = db;
caseEnv_.nsql_ = g_skipSql; caseEnv_.nsql_ = g_skipSql;
} }
...@@ -193,7 +194,8 @@ class PlannerTestBaseImpl { ...@@ -193,7 +194,8 @@ class PlannerTestBaseImpl {
private: private:
struct caseEnv { struct caseEnv {
string acctId_; int32_t acctId_;
string user_;
string db_; string db_;
int32_t nsql_; int32_t nsql_;
...@@ -295,7 +297,7 @@ class PlannerTestBaseImpl { ...@@ -295,7 +297,7 @@ class PlannerTestBaseImpl {
transform(stmtEnv_.sql_.begin(), stmtEnv_.sql_.end(), stmtEnv_.sql_.begin(), ::tolower); transform(stmtEnv_.sql_.begin(), stmtEnv_.sql_.end(), stmtEnv_.sql_.begin(), ::tolower);
SParseContext cxt = {0}; SParseContext cxt = {0};
cxt.acctId = atoi(caseEnv_.acctId_.c_str()); cxt.acctId = caseEnv_.acctId_;
cxt.db = caseEnv_.db_.c_str(); cxt.db = caseEnv_.db_.c_str();
cxt.pSql = stmtEnv_.sql_.c_str(); cxt.pSql = stmtEnv_.sql_.c_str();
cxt.sqlLen = stmtEnv_.sql_.length(); cxt.sqlLen = stmtEnv_.sql_.length();
...@@ -319,12 +321,13 @@ class PlannerTestBaseImpl { ...@@ -319,12 +321,13 @@ class PlannerTestBaseImpl {
void doParseBoundSql(SQuery* pQuery) { void doParseBoundSql(SQuery* pQuery) {
SParseContext cxt = {0}; SParseContext cxt = {0};
cxt.acctId = atoi(caseEnv_.acctId_.c_str()); cxt.acctId = caseEnv_.acctId_;
cxt.db = caseEnv_.db_.c_str(); cxt.db = caseEnv_.db_.c_str();
cxt.pSql = stmtEnv_.sql_.c_str(); cxt.pSql = stmtEnv_.sql_.c_str();
cxt.sqlLen = stmtEnv_.sql_.length(); cxt.sqlLen = stmtEnv_.sql_.length();
cxt.pMsg = stmtEnv_.msgBuf_.data(); cxt.pMsg = stmtEnv_.msgBuf_.data();
cxt.msgLen = stmtEnv_.msgBuf_.max_size(); cxt.msgLen = stmtEnv_.msgBuf_.max_size();
cxt.pUser = caseEnv_.user_.c_str();
DO_WITH_THROW(qStmtParseQuerySql, &cxt, pQuery); DO_WITH_THROW(qStmtParseQuerySql, &cxt, pQuery);
res_.ast_ = toString(pQuery->pRoot); res_.ast_ = toString(pQuery->pRoot);
...@@ -364,6 +367,7 @@ class PlannerTestBaseImpl { ...@@ -364,6 +367,7 @@ class PlannerTestBaseImpl {
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) { void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
pCxt->queryId = 1; pCxt->queryId = 1;
pCxt->pUser = caseEnv_.user_.c_str();
if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) { if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) {
pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery; pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery;
pCxt->topicQuery = true; pCxt->topicQuery = true;
...@@ -403,7 +407,7 @@ PlannerTestBase::PlannerTestBase() : impl_(new PlannerTestBaseImpl()) {} ...@@ -403,7 +407,7 @@ PlannerTestBase::PlannerTestBase() : impl_(new PlannerTestBaseImpl()) {}
PlannerTestBase::~PlannerTestBase() {} PlannerTestBase::~PlannerTestBase() {}
void PlannerTestBase::useDb(const std::string& acctId, const std::string& db) { impl_->useDb(acctId, db); } void PlannerTestBase::useDb(const std::string& user, const std::string& db) { impl_->useDb(user, db); }
void PlannerTestBase::run(const std::string& sql) { return impl_->run(sql); } void PlannerTestBase::run(const std::string& sql) { return impl_->run(sql); }
......
...@@ -30,7 +30,7 @@ class PlannerTestBase : public testing::Test { ...@@ -30,7 +30,7 @@ class PlannerTestBase : public testing::Test {
PlannerTestBase(); PlannerTestBase();
virtual ~PlannerTestBase(); virtual ~PlannerTestBase();
void useDb(const std::string& acctId, const std::string& db); void useDb(const std::string& user, const std::string& db);
void run(const std::string& sql); void run(const std::string& sql);
// stmt mode APIs // stmt mode APIs
void prepare(const std::string& sql); void prepare(const std::string& sql);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册