未验证 提交 b2e51d95 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #13527 from taosdata/feature/3_liaohj

fix(query): check array length before generating the retention string .
......@@ -224,7 +224,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define REQUEST_MAX_TRY_TIMES 1
#define REQUEST_TOTAL_EXEC_TIMES 2
#define qFatal(...) \
do { \
......
......@@ -212,6 +212,9 @@ typedef struct SRequestObj {
SArray* tableList;
SQueryExecMetric metric;
SRequestSendRecvBody body;
uint32_t prevCode; //previous error code: todo refactor, add update flag for catalog
uint32_t retry;
} SRequestObj;
typedef struct SSyncQueryParam {
......@@ -263,8 +266,8 @@ extern SAppInfo appInfo;
extern int32_t clientReqRefPool;
extern int32_t clientConnRefPool;
extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
void* createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo);
......@@ -274,7 +277,7 @@ int32_t releaseTscObj(int64_t rid);
uint64_t generateRequestId();
void* createRequest(STscObj* pObj, void* param, int32_t type);
void* createRequest(STscObj* pObj, int32_t type);
void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
......@@ -290,8 +293,6 @@ void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType);
......@@ -325,6 +326,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
int32_t removeMeta(STscObj* pTscObj, SArray* tbList);// todo move to clientImpl.c and become a static function
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);// todo move to xxx
#ifdef __cplusplus
}
......
......@@ -177,7 +177,7 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
void *createRequest(STscObj *pObj, void *param, int32_t type) {
void *createRequest(STscObj *pObj, int32_t type) {
assert(pObj != NULL);
SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
......@@ -191,8 +191,6 @@ void *createRequest(STscObj *pObj, void *param, int32_t type) {
pRequest->requestId = generateRequestId();
pRequest->metric.start = taosGetTimestampUs();
pRequest->body.param = param;
pRequest->type = type;
pRequest->pTscObj = pObj;
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
......@@ -280,7 +278,6 @@ void taos_init_imp(void) {
return;
}
initMsgHandleFp();
initQueryModuleMsgHandle();
rpcInit();
......
......@@ -133,7 +133,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
}
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
*pRequest = createRequest(pTscObj, NULL, TSDB_SQL_SELECT);
*pRequest = createRequest(pTscObj, TSDB_SQL_SELECT);
if (*pRequest == NULL) {
tscError("failed to malloc sqlObj");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -207,6 +207,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false);
}
return code;
}
......@@ -235,6 +236,31 @@ static SAppInstInfo* getAppInfo(SRequestObj* pRequest) {
return pRequest->pTscObj->pAppInfo;
}
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL;
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false);
}
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->code = code;
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
} else {
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
pRequest->requestId);
}
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
// pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
}
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
// drop table if exists not_exists_table
if (NULL == pQuery->pCmdMsg) {
......@@ -509,19 +535,20 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
}
int32_t handleExecRes(SRequestObj* pRequest) {
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
if (NULL == pRequest->body.resInfo.execRes.res) {
return TSDB_CODE_SUCCESS;
}
int32_t code = 0;
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
if (code) {
return code;
}
SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp);
SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
switch (pRes->msgType) {
......@@ -539,8 +566,9 @@ int32_t handleExecRes(SRequestObj* pRequest) {
break;
}
default:
tscError("invalid exec result for request type %d", pRequest->type);
return TSDB_CODE_APP_ERROR;
tscError("0x%"PRIx64", invalid exec result for request type %d, reqId:0x%"PRIx64, pRequest->self,
pRequest->type, pRequest->requestId);
code = TSDB_CODE_APP_ERROR;
}
return code;
......@@ -548,6 +576,25 @@ int32_t handleExecRes(SRequestObj* pRequest) {
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*) param;
pRequest->code = code;
STscObj* pTscObj = pRequest->pTscObj;
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%"PRIx64" client retry to handle the error, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
pRequest->prevCode = code;
doAsyncQuery(pRequest, true);
return;
}
if (code == TSDB_CODE_SUCCESS) {
code = handleQueryExecRsp(pRequest);
ASSERT(pRequest->code == TSDB_CODE_SUCCESS);
pRequest->code = code;
}
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pTscObj, pRequest->tableList);
}
// return to client
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
......@@ -583,7 +630,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
qDestroyQuery(pQuery);
}
handleExecRes(pRequest);
handleQueryExecRsp(pRequest);
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno;
......@@ -617,13 +664,12 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
}
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
void* pRes = NULL;
int32_t code = 0;
switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL:
code = execLocalCmd(pRequest, pQuery);
break;
asyncExecLocalCmd(pRequest, pQuery);
return;
case QUERY_EXEC_MODE_RPC:
code = asyncExecDdlQuery(pRequest, pQuery);
break;
......@@ -649,11 +695,9 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code) {
schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob,
pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest);
// if (NULL != pRes) {
// code = validateSversion(pRequest, pRes);
// }
}
//todo not to be released here
taosArrayDestroy(pNodeList);
break;
}
......@@ -671,12 +715,6 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno;
}
// if (res) {
// *res = pRes;
// } else {
// freeRequestRes(pRequest, pRes);
// pRes = NULL;
}
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
......@@ -750,7 +788,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
pRequest->code = code;
break;
}
} while (retryNum++ < REQUEST_MAX_TRY_TIMES);
} while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pTscObj, pRequest->tableList);
......@@ -808,7 +846,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return pTscObj;
}
SRequestObj* pRequest = createRequest(pTscObj, param, TDMT_MND_CONNECT);
SRequestObj* pRequest = createRequest(pTscObj, TDMT_MND_CONNECT);
if (pRequest == NULL) {
destroyTscObj(pTscObj);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -850,7 +888,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
pMsgSendInfo->fp = getMsgRspHandle(pMsgSendInfo->msgType);
pMsgSendInfo->param = pRequest;
SConnectReq connectReq = {0};
......@@ -1429,7 +1467,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
}
code = statusRsp.statusCode;
if (details != NULL && statusRsp.details != NULL) {
if (details != NULL) {
tstrncpy(details, statusRsp.details, maxlen);
}
......
......@@ -30,6 +30,7 @@
#define TSC_VAR_RELEASED 0
static int32_t sentinel = TSC_VAR_NOT_RELEASE;
static int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt);
int taos_options(TSDB_OPTION option, const void *arg, ...) {
static int32_t lock = 0;
......@@ -192,7 +193,7 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
STscObj* pTscObj = (STscObj*)taos;
#if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(struct SSyncQueryParam));
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0);
taos_query_a(pTscObj, sql, syncQueryFn, param);
......@@ -608,36 +609,38 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) {
SQuery* pQuery = pWrapper->pQuery;
SRequestObj* pRequest = pWrapper->pRequest;
if (code != TSDB_CODE_SUCCESS) {
goto _error;
if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
}
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (pQuery->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, (pQuery)->numOfResCols);
setResPrecision(&pRequest->body.resInfo, (pQuery)->precision);
}
if (code == TSDB_CODE_SUCCESS) {
if (pQuery->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols);
setResPrecision(&pRequest->body.resInfo, pQuery->precision);
}
TSWAP(pRequest->dbList, (pQuery)->pDbList);
TSWAP(pRequest->tableList, (pQuery)->pTableList);
TSWAP(pRequest->dbList, (pQuery)->pDbList);
TSWAP(pRequest->tableList, (pQuery)->pTableList);
taosMemoryFree(pWrapper);
launchAsyncQuery(pRequest, pQuery);
return;
taosMemoryFree(pWrapper);
launchAsyncQuery(pRequest, pQuery);
} else {
if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%"PRIx64" client retry to handle the error, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
pRequest->prevCode = code;
doAsyncQuery(pRequest, true);
return;
}
_error:
taosMemoryFree(pWrapper);
tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
// return to app directly
taosMemoryFree(pWrapper);
tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
}
// todo add retry before return user's callback
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
ASSERT(fp != NULL);
......@@ -657,24 +660,27 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
}
SRequestObj *pRequest = NULL;
int32_t retryNum = 0;
int32_t code = 0;
// while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
code = buildRequest(taos, sql, sqlLen, &pRequest);
int32_t code = buildRequest(taos, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
fp(param, NULL, code);
fp(param, NULL, terrno);
return;
}
pRequest->body.queryFp = fp;
pRequest->body.param = param;
doAsyncQuery(pRequest, false);
}
STscObj *pTscObj = pRequest->pTscObj;
int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) {
const STscObj *pTscObj = pRequest->pTscObj;
*pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
if (*pCxt == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SParseContext* pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
*pCxt = (SParseContext){.requestId = pRequest->requestId,
**pCxt = (SParseContext){.requestId = pRequest->requestId,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = false,
......@@ -687,6 +693,22 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
.pUser = pTscObj->user,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.async = true,};
return TSDB_CODE_SUCCESS;
}
void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) {
SParseContext* pCxt = NULL;
STscObj *pTscObj = pRequest->pTscObj;
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
pRequest->code = pRequest->prevCode;
goto _error;
}
int32_t code = createParseContext(pRequest, &pCxt);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pCxt->mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCxt->pCatalog);
......@@ -694,39 +716,36 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
goto _error;
}
SQuery * pQuery = NULL;
SCatalogReq catalogReq = {0};
SQuery *pQuery = NULL;
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce};
code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SqlParseWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SqlParseWrapper));
if (pWrapper == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pWrapper->pCtx = pCxt;
pWrapper->pQuery = pQuery;
pWrapper->pRequest = pRequest;
pWrapper->catalogReq = catalogReq;
code = catalogAsyncGetAllMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pRequest->requestId,
&catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
&catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob);
if (code == TSDB_CODE_SUCCESS) {
return;
}
return;
// todo handle the retry process
// if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
// TSWAP(pRequest->dbList, (pQuery)->pDbList);
// TSWAP(pRequest->tableList, (pQuery)->pTableList);
// }
_error:
tscError("0x%"PRIx64" error happens, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
terrno = code;
pRequest->code = code;
fp(param, pRequest, code);
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
static void fetchCallback(void* pResult, void* param, int32_t code) {
......@@ -751,14 +770,15 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResultInfo->pData, true, false);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->code = code;
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
} else {
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
pRequest->requestId);
}
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
}
......
......@@ -21,8 +21,6 @@
#include "tdef.h"
#include "tname.h"
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
static void setErrno(SRequestObj* pRequest, int32_t code) {
pRequest->code = code;
terrno = code;
......@@ -107,10 +105,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
assert(pRequest != NULL);
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)
? genericRspCallback
: handleRequestRspFp[TMSG_INDEX(pRequest->type)];
pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
return pMsgSendInfo;
}
......@@ -209,7 +204,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return 0;
}
int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param;
......@@ -219,6 +214,7 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
}
if (pRequest->body.queryFp != NULL) {
removeMeta(pRequest->pTscObj, pRequest->tableList);
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else {
tsem_post(&pRequest->body.rspSem);
......@@ -251,30 +247,54 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
} else {
SMAlterStbRsp alterRsp = {0};
SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
tDecodeSMAlterStbRsp(&coder, &alterRsp);
tDecoderClear(&coder);
pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
}
SMAlterStbRsp alterRsp = {0};
SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
tDecodeSMAlterStbRsp(&coder, &alterRsp);
tDecoderClear(&coder);
if (pRequest->body.queryFp != NULL) {
SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
if (code == TSDB_CODE_SUCCESS) {
SCatalog* pCatalog = NULL;
int32_t ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (pRes->res != NULL) {
ret = handleAlterTbExecRes(pRes->res, pCatalog);
}
tsem_post(&pRequest->body.rspSem);
if (ret != TSDB_CODE_SUCCESS) {
code = ret;
}
}
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else {
tsem_post(&pRequest->body.rspSem);
}
return code;
}
// todo refactor: this arraylist is too large
void initMsgHandleFp() {
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = processAlterStbRsp;
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
switch (msgType) {
case TDMT_MND_CONNECT:
return processConnectRsp;
case TDMT_MND_CREATE_DB:
return processCreateDbRsp;
case TDMT_MND_USE_DB:
return processUseDbRsp;
case TDMT_MND_CREATE_STB:
return processCreateSTableRsp;
case TDMT_MND_DROP_DB:
return processDropDbRsp;
case TDMT_MND_ALTER_STB:
return processAlterStbRsp;
default:
return genericRspCallback;
}
}
......@@ -2364,7 +2364,7 @@ static int32_t isSchemalessDb(SSmlHandle* info){
*/
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT);
SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
if(!request){
uError("SML:taos_schemaless_insert error request is null");
return NULL;
......
......@@ -778,7 +778,35 @@ TEST(testCase, async_api_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
taos_query_a(pConn, "drop table test.tm0", queryCallback, pConn);
taos_query(pConn, "use test");
TAOS_RES* pRes = taos_query(pConn, "select * from t1");
taos_query(pConn, "alter table t1 add column b int");
pRes = taos_query(pConn, "insert into t1 values(now, 1, 2)");
if (taos_errno(pRes) != 0) {
printf("failed, reason:%s\n", taos_errstr(pRes));
}
// int32_t n = 0;
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t* length = taos_fetch_lengths(pRes);
// for(int32_t i = 0; i < numOfFields; ++i) {
// printf("(%d):%d " , i, length[i]);
// }
// printf("\n");
//
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// memset(str, 0, sizeof(str));
// }
taos_query_a(pConn, "alter table test.m1 comment 'abcde' ", queryCallback, pConn);
getchar();
taos_close(pConn);
}
......
......@@ -486,7 +486,7 @@ TEST(testCase, smlProcess_influx_Test) {
pRes = taos_query(taos, "use inflx_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -607,7 +607,7 @@ TEST(testCase, smlParseLine_error_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -656,7 +656,7 @@ TEST(testCase, smlProcess_telnet_Test) {
pRes = taos_query(taos, "use telnet_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -710,7 +710,7 @@ TEST(testCase, smlProcess_json1_Test) {
pRes = taos_query(taos, "use json_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -779,7 +779,7 @@ TEST(testCase, smlProcess_json2_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -823,7 +823,7 @@ TEST(testCase, smlProcess_json3_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -895,7 +895,7 @@ TEST(testCase, smlProcess_json4_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -957,7 +957,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -1006,7 +1006,7 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -1033,7 +1033,7 @@ TEST(testCase, smlParseTelnetLine_json_error_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -1101,7 +1101,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -1146,7 +1146,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -1191,7 +1191,7 @@ TEST(testCase, sml_TD15662_Test) {
pRes = taos_query(taos, "use db_15662");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
......@@ -1218,7 +1218,7 @@ TEST(testCase, sml_TD15735_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
......@@ -1244,7 +1244,7 @@ TEST(testCase, sml_TD15742_Test) {
pRes = taos_query(taos, "use TD15742");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
......
......@@ -1392,19 +1392,25 @@ char *buildRetension(SArray *pRetension) {
int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
p = taosArrayGet(pRetension, 1);
if (size > 1) {
len += sprintf(p1 + len, ",");
p = taosArrayGet(pRetension, 1);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
}
p = taosArrayGet(pRetension, 2);
if (size > 2) {
len += sprintf(p1 + len, ",");
p = taosArrayGet(pRetension, 2);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
}
varDataSetLen(p1, len);
return p1;
......
......@@ -518,6 +518,7 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
char *ctgTaskTypeStr(CTG_TASK_TYPE type);
extern SCatalogMgmt gCtgMgmt;
......
......@@ -44,7 +44,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -67,7 +67,7 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS;
}
......@@ -90,7 +90,7 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS;
}
......@@ -113,7 +113,7 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS;
}
......@@ -143,7 +143,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -158,7 +158,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized", pJob->queryId, taskIdx, task.type);
qDebug("QID:%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
return TSDB_CODE_SUCCESS;
}
......@@ -181,7 +181,7 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, indexFName:%s", pJob->queryId, taskIdx, task.type, name);
qDebug("QID:%" PRIx64 " the %d task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
return TSDB_CODE_SUCCESS;
}
......@@ -204,7 +204,7 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, udfName:%s", pJob->queryId, taskIdx, task.type, name);
qDebug("QID:%" PRIx64 " the %d task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
return TSDB_CODE_SUCCESS;
}
......@@ -227,11 +227,96 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user)
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, user:%s", pJob->queryId, taskIdx, task.type, user->user);
qDebug("QID:%" PRIx64 " the %d task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user);
return TSDB_CODE_SUCCESS;
}
int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* pReq) {
int32_t dbNum = pJob->dbCfgNum + pJob->dbVgNum + pJob->dbInfoNum;
if (dbNum > 0) {
if (dbNum > pJob->dbCfgNum && dbNum > pJob->dbVgNum && dbNum > pJob->dbInfoNum) {
SHashObj* pDb = taosHashInit(dbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pDb) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
}
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
}
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
}
char* dbFName = taosHashIterate(pDb, NULL);
while (dbFName) {
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
dbFName = taosHashIterate(pDb, dbFName);
}
taosHashCleanup(pDb);
} else {
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
}
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
}
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
}
}
}
int32_t tbNum = pJob->tbMetaNum + pJob->tbHashNum;
if (tbNum > 0) {
if (tbNum > pJob->tbMetaNum && tbNum > pJob->tbHashNum) {
SHashObj* pTb = taosHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
SName* name = taosArrayGet(pReq->pTableMeta, i);
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
}
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
SName* name = taosArrayGet(pReq->pTableHash, i);
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
}
SName* name = taosHashIterate(pTb, NULL);
while (name) {
catalogRemoveTableMeta(pCtg, name);
name = taosHashIterate(pTb, name);
}
taosHashCleanup(pTb);
} else {
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
SName* name = taosArrayGet(pReq->pTableMeta, i);
catalogRemoveTableMeta(pCtg, name);
}
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
SName* name = taosArrayGet(pReq->pTableHash, i);
catalogRemoveTableMeta(pCtg, name);
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
int32_t code = 0;
......@@ -283,12 +368,13 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
if (pReq->forceUpdate) {
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, pJob, pReq));
}
int32_t taskIdx = 0;
for (int32_t i = 0; i < dbVgNum; ++i) {
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
if (pReq->forceUpdate) {
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
}
CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName));
}
......@@ -304,9 +390,6 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
for (int32_t i = 0; i < tbMetaNum; ++i) {
SName* name = taosArrayGet(pReq->pTableMeta, i);
if (pReq->forceUpdate) {
catalogRemoveTableMeta(pCtg, name);
}
CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name));
}
......@@ -342,7 +425,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
qDebug("QID:%" PRIx64 ", job %" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum);
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, *taskNum, pReq->forceUpdate);
return TSDB_CODE_SUCCESS;
......@@ -1108,7 +1191,7 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
for (int32_t i = 0; i < taskNum; ++i) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
qDebug("QID:%" PRIx64 " start to launch task %d", pJob->queryId, pTask->taskId);
qDebug("QID:0x%" PRIx64 " start to launch task %d", pJob->queryId, pTask->taskId);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
}
......
......@@ -177,7 +177,7 @@ int32_t ctgHandleMsgCallback(void *param, const SDataBuf *pMsg, int32_t rspCode)
SCtgTask *pTask = taosArrayGet(pJob->pTasks, cbParam->taskId);
qDebug("QID:%" PRIx64 " task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1));
qDebug("QID:0x%" PRIx64 " task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1));
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode));
......@@ -244,7 +244,7 @@ int32_t ctgAsyncSendMsg(CTG_PARAMS, SCtgTask* pTask, int32_t msgType, void *msg,
CTG_ERR_JRET(code);
}
ctgDebug("req msg sent, reqId:%" PRIx64 ", msg type:%d, %s", pTask->pJob->queryId, msgType, TMSG_INFO(msgType));
ctgDebug("req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pTask->pJob->queryId, msgType, TMSG_INFO(msgType));
return TSDB_CODE_SUCCESS;
_return:
......@@ -565,7 +565,7 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
CTG_RET(ctgAsyncSendMsg(pCtg, pTrans, &vgroupInfo->epSet, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
......
......@@ -19,6 +19,31 @@
#include "catalogInt.h"
#include "systable.h"
char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
switch (type) {
case CTG_TASK_GET_QNODE:
return "[get qnode list]";
case CTG_TASK_GET_DB_VGROUP:
return "[get db vgroup]";
case CTG_TASK_GET_DB_CFG:
return "[get db cfg]";
case CTG_TASK_GET_DB_INFO:
return "[get db info]";
case CTG_TASK_GET_TB_META:
return "[get table meta]";
case CTG_TASK_GET_TB_HASH:
return "[get table hash]";
case CTG_TASK_GET_INDEX:
return "[get index]";
case CTG_TASK_GET_UDF:
return "[get udf]";
case CTG_TASK_GET_USER:
return "[get user]";
default:
return "unknown";
}
}
void ctgFreeSMetaData(SMetaData* pData) {
taosArrayDestroy(pData->pTableMeta);
pData->pTableMeta = NULL;
......@@ -477,6 +502,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
*pVgroup = *vgInfo;
ctgDebug("Got tb %s hash vgroup, vgId %d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId, vgInfo->epSet.numOfEps,
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
CTG_RET(code);
}
......
......@@ -258,6 +258,17 @@ static int32_t collectMetaKeyFromExplain(SCollectMetaKeyCxt* pCxt, SExplainStmt*
return collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStmt* pStmt) {
SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
strcpy(name.dbname, pStmt->dbName);
strcpy(name.tname, pStmt->tableName);
int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateStreamStmt* pStmt) {
return collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
......@@ -381,6 +392,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromCreateTopic(pCxt, (SCreateTopicStmt*)pStmt);
case QUERY_NODE_EXPLAIN_STMT:
return collectMetaKeyFromExplain(pCxt, (SExplainStmt*)pStmt);
case QUERY_NODE_DESCRIBE_STMT:
return collectMetaKeyFromDescribe(pCxt, (SDescribeStmt*)pStmt);
case QUERY_NODE_CREATE_STREAM_STMT:
return collectMetaKeyFromCreateStream(pCxt, (SCreateStreamStmt*)pStmt);
case QUERY_NODE_SHOW_DNODES_STMT:
......
......@@ -2903,7 +2903,6 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
}
}
taosArrayDestroy(dbCfg.pRetensions);
return code;
}
......
......@@ -207,6 +207,13 @@ int32_t __catalogGetUdfInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps
return g_mockCatalogService->catalogGetUdfInfo(funcName, pInfo);
}
int32_t __catalogRefreshGetTableMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps,
const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta);
}
int32_t __catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { return 0; }
void initMetaDataEnv() {
g_mockCatalogService.reset(new MockCatalogService());
......@@ -221,6 +228,8 @@ void initMetaDataEnv() {
stub.set(catalogGetDBCfg, __catalogGetDBCfg);
stub.set(catalogChkAuth, __catalogChkAuth);
stub.set(catalogGetUdfInfo, __catalogGetUdfInfo);
stub.set(catalogRefreshGetTableMeta, __catalogRefreshGetTableMeta);
stub.set(catalogRemoveTableMeta, __catalogRemoveTableMeta);
// {
// AddrAny any("libcatalog.so");
// std::map<std::string,void*> result;
......
......@@ -21,7 +21,7 @@ namespace ParserTest {
class ParserInitialDTest : public ParserDdlTest {};
// DELETE FROM tb_name [WHERE condition]
// DELETE FROM table_name [WHERE condition]
TEST_F(ParserInitialDTest, delete) {
useDb("root", "test");
......@@ -40,7 +40,15 @@ TEST_F(ParserInitialDTest, deleteSemanticCheck) {
run("DELETE FROM t1 WHERE c1 > 10", TSDB_CODE_PAR_INVALID_DELETE_WHERE, PARSER_STAGE_TRANSLATE);
}
// todo desc
// DESC table_name
TEST_F(ParserInitialDTest, describe) {
useDb("root", "test");
run("DESC t1");
run("DESCRIBE st1");
}
// todo describe
// todo DROP account
......
......@@ -50,11 +50,13 @@ class ParserDdlTest : public ParserTestBase {
virtual void checkDdl(const SQuery* pQuery, ParserStage stage) {
ASSERT_NE(pQuery, nullptr);
ASSERT_EQ(pQuery->haveResultSet, false);
ASSERT_NE(pQuery->pRoot, nullptr);
ASSERT_EQ(pQuery->numOfResCols, 0);
ASSERT_EQ(pQuery->pResSchema, nullptr);
ASSERT_EQ(pQuery->precision, 0);
if (QUERY_EXEC_MODE_RPC == pQuery->execMode) {
ASSERT_EQ(pQuery->haveResultSet, false);
ASSERT_EQ(pQuery->numOfResCols, 0);
ASSERT_EQ(pQuery->pResSchema, nullptr);
ASSERT_EQ(pQuery->precision, 0);
}
if (nullptr != checkDdl_) {
checkDdl_(pQuery, stage);
}
......
......@@ -37,6 +37,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
pOut->dbVgroup->vgVersion = usedbRsp->vgVersion;
pOut->dbVgroup->hashMethod = usedbRsp->hashMethod;
qDebug("Got %d vgroup for db %s", usedbRsp->vgNum, usedbRsp->db);
if (usedbRsp->vgNum <= 0) {
return TSDB_CODE_SUCCESS;
}
......@@ -50,6 +52,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
for (int32_t i = 0; i < usedbRsp->vgNum; ++i) {
SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i);
pOut->dbVgroup->numOfTable += pVgInfo->numOfTable;
qDebug("the %dth vgroup, id %d, epNum %d, current %s port %d", i, pVgInfo->vgId, pVgInfo->epSet.numOfEps,
pVgInfo->epSet.eps[pVgInfo->epSet.inUse].fqdn, pVgInfo->epSet.eps[pVgInfo->epSet.inUse].port);
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......
......@@ -108,7 +108,7 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN
pJob->refId = refId;
SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);
SCH_JOB_DLOG("job refId:0x%" PRIx64" created", pJob->refId);
pJob->status = JOB_TASK_STATUS_NOT_START;
......
......@@ -118,6 +118,7 @@ if $data[0][5] != 5 then
return -1
endi
if $data[0][6] != 101 then
print expect 101, actual: $data06
return -1
endi
......
......@@ -7,57 +7,57 @@
#run tsim/table/basic1.sim
#run tsim/trans/lossdata1.sim
#run tsim/trans/create_db.sim
##run tsim/stable/alter_metrics.sim
##run tsim/stable/tag_modify.sim
##run tsim/stable/alter_comment.sim
##run tsim/stable/column_drop.sim
##run tsim/stable/column_modify.sim
##run tsim/stable/tag_rename.sim
#run tsim/stable/alter_metrics.sim
#run tsim/stable/tag_modify.sim
#run tsim/stable/alter_comment.sim
#run tsim/stable/column_drop.sim
#run tsim/stable/column_modify.sim
#run tsim/stable/tag_rename.sim
#run tsim/stable/vnode3.sim
#run tsim/stable/metrics.sim
##run tsim/stable/alter_insert2.sim
#run tsim/stable/alter_insert2.sim
#run tsim/stable/show.sim
#run tsim/stable/alter_import.sim
##run tsim/stable/tag_add.sim
#run tsim/stable/tag_add.sim
#run tsim/stable/tag_drop.sim
#run tsim/stable/column_add.sim
#run tsim/stable/alter_count.sim
#run tsim/stable/values.sim
#run tsim/stable/dnode3.sim
run tsim/stable/dnode3.sim
#run tsim/stable/alter_insert1.sim
#run tsim/stable/refcount.sim
#run tsim/stable/disk.sim
#run tsim/db/basic1.sim
#run tsim/db/basic3.sim
##run tsim/db/basic7.sim
#run tsim/db/basic6.sim
#run tsim/db/create_all_options.sim
#run tsim/db/basic2.sim
#run tsim/db/error1.sim
#run tsim/db/taosdlog.sim
#run tsim/db/alter_option.sim
#run tsim/mnode/basic1.sim
#run tsim/mnode/basic3.sim
#run tsim/mnode/basic2.sim
#run tsim/parser/fourArithmetic-basic.sim
#run tsim/parser/groupby-basic.sim
#run tsim/snode/basic1.sim
#run tsim/query/time_process.sim
#run tsim/query/stddev.sim
#run tsim/query/interval-offset.sim
#run tsim/query/charScalarFunction.sim
#run tsim/query/complex_select.sim
#run tsim/query/explain.sim
#run tsim/query/crash_sql.sim
#run tsim/query/diff.sim
#run tsim/query/complex_limit.sim
#run tsim/query/complex_having.sim
#run tsim/query/udf.sim
#run tsim/query/complex_group.sim
#run tsim/query/interval.sim
#run tsim/query/session.sim
#run tsim/query/scalarFunction.sim
##run tsim/query/scalarNull.sim
run tsim/db/basic1.sim
run tsim/db/basic3.sim
#run tsim/db/basic7.sim
run tsim/db/basic6.sim
run tsim/db/create_all_options.sim
run tsim/db/basic2.sim
run tsim/db/error1.sim
run tsim/db/taosdlog.sim
run tsim/db/alter_option.sim
run tsim/mnode/basic1.sim
run tsim/mnode/basic3.sim
run tsim/mnode/basic2.sim
run tsim/parser/fourArithmetic-basic.sim
run tsim/parser/groupby-basic.sim
run tsim/snode/basic1.sim
run tsim/query/time_process.sim
run tsim/query/stddev.sim
run tsim/query/interval-offset.sim
run tsim/query/charScalarFunction.sim
run tsim/query/complex_select.sim
run tsim/query/explain.sim
run tsim/query/crash_sql.sim
run tsim/query/diff.sim
run tsim/query/complex_limit.sim
run tsim/query/complex_having.sim
run tsim/query/udf.sim
run tsim/query/complex_group.sim
run tsim/query/interval.sim
run tsim/query/session.sim
run tsim/query/scalarFunction.sim
#run tsim/query/scalarNull.sim
run tsim/query/complex_where.sim
run tsim/tmq/basic1.sim
run tsim/tmq/basic4.sim
......
......@@ -1003,7 +1003,7 @@ int32_t shellExecute() {
taosSetSignal(SIGINT, shellSigintHandler);
shellGetGrantInfo(shell.conn);
shellGetGrantInfo();
while (1) {
taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册