提交 e1881185 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last

...@@ -118,3 +118,4 @@ contrib/* ...@@ -118,3 +118,4 @@ contrib/*
!contrib/test !contrib/test
sql sql
debug*/ debug*/
.env
\ No newline at end of file
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 8a5e336 GIT_TAG 3c7dafe
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taosws-rs # taosws-rs
ExternalProject_Add(taosws-rs ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git
GIT_TAG 648cc62 GIT_TAG 29424d5
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
---
sidebar_label: 发布历史
title: 发布历史
---
import Release from "/components/Release";
<Release versionPrefix="3.0" />
...@@ -3055,6 +3055,7 @@ int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); ...@@ -3055,6 +3055,7 @@ int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes); int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
typedef struct { typedef struct {
int32_t msgIdx;
int32_t msgType; int32_t msgType;
int32_t msgLen; int32_t msgLen;
void* msg; void* msg;
...@@ -3068,6 +3069,7 @@ typedef struct { ...@@ -3068,6 +3069,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t reqType; int32_t reqType;
int32_t msgIdx;
int32_t msgLen; int32_t msgLen;
int32_t rspCode; int32_t rspCode;
void* msg; void* msg;
......
...@@ -58,12 +58,17 @@ typedef struct SDbInfo { ...@@ -58,12 +58,17 @@ typedef struct SDbInfo {
int64_t dbId; int64_t dbId;
} SDbInfo; } SDbInfo;
typedef struct STablesReq {
char dbFName[TSDB_DB_FNAME_LEN];
SArray* pTables;
} STablesReq;
typedef struct SCatalogReq { typedef struct SCatalogReq {
SArray* pDbVgroup; // element is db full name SArray* pDbVgroup; // element is db full name
SArray* pDbCfg; // element is db full name SArray* pDbCfg; // element is db full name
SArray* pDbInfo; // element is db full name SArray* pDbInfo; // element is db full name
SArray* pTableMeta; // element is SNAME SArray* pTableMeta; // element is STablesReq
SArray* pTableHash; // element is SNAME SArray* pTableHash; // element is STablesReq
SArray* pUdf; // element is udf name SArray* pUdf; // element is udf name
SArray* pIndex; // element is index name SArray* pIndex; // element is index name
SArray* pUser; // element is SUserAuthInfo SArray* pUser; // element is SUserAuthInfo
......
...@@ -103,7 +103,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -103,7 +103,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
* @return * @return
*/ */
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model); qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model);
/** /**
* *
......
...@@ -43,11 +43,12 @@ typedef struct SRpcHandleInfo { ...@@ -43,11 +43,12 @@ typedef struct SRpcHandleInfo {
// rpc info // rpc info
void *handle; // rpc handle returned to app void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server int64_t refId; // refid, used by server
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp)
int32_t persistHandle; // persist handle or not int8_t persistHandle; // persist handle or not
STraceId traceId;
int8_t hasEpSet; int8_t hasEpSet;
STraceId traceId;
// app info // app info
void *ahandle; // app handle set by client void *ahandle; // app handle set by client
void *wrapper; // wrapper handle void *wrapper; // wrapper handle
...@@ -69,8 +70,9 @@ typedef struct SRpcMsg { ...@@ -69,8 +70,9 @@ typedef struct SRpcMsg {
SRpcHandleInfo info; SRpcHandleInfo info;
} SRpcMsg; } SRpcMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf); typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef struct SRpcInit { typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN]; char localFqdn[TSDB_FQDN_LEN];
...@@ -84,12 +86,15 @@ typedef struct SRpcInit { ...@@ -84,12 +86,15 @@ typedef struct SRpcInit {
// the following is for client app ecurity only // the following is for client app ecurity only
char *user; // user name char *user; // user name
// call back to process incoming msg, code shall be ignored by server app // call back to process incoming msg
RpcCfp cfp; RpcCfp cfp;
// user defined retry func // retry not not for particular msg
RpcRfp rfp; RpcRfp rfp;
// set up timeout for particular msg
RpcTfp tfp;
void *parent; void *parent;
} SRpcInit; } SRpcInit;
......
...@@ -46,6 +46,7 @@ int32_t* taosGetErrno(); ...@@ -46,6 +46,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015) #define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015)
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017) #define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017)
#define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018) #define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018)
#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019)
//common & util //common & util
#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013) #define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013)
......
...@@ -109,6 +109,14 @@ static bool clientRpcRfp(int32_t code, tmsg_t msgType) { ...@@ -109,6 +109,14 @@ static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
} }
} }
// start timer for particular msgType
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
return true;
}
return false;
}
// TODO refactor // TODO refactor
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit; SRpcInit rpcInit;
...@@ -118,6 +126,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { ...@@ -118,6 +126,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.numOfThreads = numOfThread; rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer; rpcInit.cfp = processMsgFromServer;
rpcInit.rfp = clientRpcRfp; rpcInit.rfp = clientRpcRfp;
rpcInit.tfp = clientRpcTfp;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user; rpcInit.user = (char *)user;
......
...@@ -1308,8 +1308,8 @@ int32_t doProcessMsgFromServer(void* param) { ...@@ -1308,8 +1308,8 @@ int32_t doProcessMsgFromServer(void* param) {
char tbuf[40] = {0}; char tbuf[40] = {0};
TRACE_TO_STR(trace, tbuf); TRACE_TO_STR(trace, tbuf);
tscDebug("processMsgFromServer handle %p, message: %s, code: %s, gtid: %s", pMsg->info.handle, tscDebug("processMsgFromServer handle %p, message: %s, size:%d, code: %s, gtid: %s", pMsg->info.handle,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), tbuf); TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code), tbuf);
if (pSendInfo->requestObjRefId != 0) { if (pSendInfo->requestObjRefId != 0) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
...@@ -1922,7 +1922,7 @@ _OVER: ...@@ -1922,7 +1922,7 @@ _OVER:
return code; return code;
} }
int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str, int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
int32_t acctId, char* db) { int32_t acctId, char* db) {
SName name; SName name;
...@@ -1957,20 +1957,33 @@ int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, i ...@@ -1957,20 +1957,33 @@ int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, i
return -1; return -1;
} }
taosArrayPush(pList, &name); char dbFName[TSDB_DB_FNAME_LEN];
sprintf(dbFName, "%d.%.*s", acctId, dbLen, dbName);
STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
if (pDb) {
taosArrayPush(pDb->pTables, &name);
} else {
STablesReq db;
db.pTables = taosArrayInit(20, sizeof(SName));
strcpy(db.dbFName, dbFName);
taosArrayPush(db.pTables, &name);
taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db));
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) { int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
*pReq = taosArrayInit(10, sizeof(SName)); SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == *pReq) { if (NULL == pHash) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
} }
bool inEscape = false; bool inEscape = false;
int32_t code = 0; int32_t code = 0;
void *pIter = NULL;
int32_t vIdx = 0; int32_t vIdx = 0;
int32_t vPos[2]; int32_t vPos[2];
...@@ -1985,7 +1998,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, ...@@ -1985,7 +1998,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
vLen[vIdx] = i - vPos[vIdx]; vLen[vIdx] = i - vPos[vIdx];
} }
code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName); code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
if (code) { if (code) {
goto _return; goto _return;
} }
...@@ -2035,7 +2048,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, ...@@ -2035,7 +2048,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
vLen[vIdx] = i - vPos[vIdx]; vLen[vIdx] = i - vPos[vIdx];
} }
code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName); code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
if (code) { if (code) {
goto _return; goto _return;
} }
...@@ -2067,14 +2080,31 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, ...@@ -2067,14 +2080,31 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
goto _return; goto _return;
} }
int32_t dbNum = taosHashGetSize(pHash);
*pReq = taosArrayInit(dbNum, sizeof(STablesReq));
pIter = taosHashIterate(pHash, NULL);
while (pIter) {
STablesReq* pDb = (STablesReq*)pIter;
taosArrayPush(*pReq, pDb);
pIter = taosHashIterate(pHash, pIter);
}
taosHashCleanup(pHash);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
terrno = TSDB_CODE_TSC_INVALID_OPERATION; terrno = TSDB_CODE_TSC_INVALID_OPERATION;
taosArrayDestroy(*pReq); pIter = taosHashIterate(pHash, NULL);
*pReq = NULL; while (pIter) {
STablesReq* pDb = (STablesReq*)pIter;
taosArrayDestroy(pDb->pTables);
pIter = taosHashIterate(pHash, pIter);
}
taosHashCleanup(pHash);
return terrno; return terrno;
} }
......
...@@ -308,9 +308,9 @@ static const SSysDbTableSchema offsetSchema[] = { ...@@ -308,9 +308,9 @@ static const SSysDbTableSchema offsetSchema[] = {
}; };
static const SSysDbTableSchema querySchema[] = { static const SSysDbTableSchema querySchema[] = {
{.name = "query_id", .bytes = TSDB_QUERY_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "kill_id", .bytes = TSDB_QUERY_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "req_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "query_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT}, {.name = "conn_id", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
{.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
......
...@@ -84,6 +84,9 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { ...@@ -84,6 +84,9 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
} }
for (int32_t i = 0; i < msgNum; ++i) { for (int32_t i = 0; i < msgNum; ++i) {
req.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgIdx);
req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset)); req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgType); offset += sizeof(req.msgType);
...@@ -111,6 +114,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { ...@@ -111,6 +114,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
} else { } else {
rsp.rspCode = 0; rsp.rspCode = 0;
} }
rsp.msgIdx = req.msgIdx;
rsp.reqType = reqMsg.msgType; rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.info.rspLen; rsp.msgLen = reqMsg.info.rspLen;
rsp.msg = reqMsg.info.rsp; rsp.msg = reqMsg.info.rsp;
...@@ -136,6 +140,8 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { ...@@ -136,6 +140,8 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
*(int32_t*)((char*)pRsp + offset) = htonl(p->reqType); *(int32_t*)((char*)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType); offset += sizeof(p->reqType);
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgIdx);
offset += sizeof(p->msgIdx);
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen); *(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen); offset += sizeof(p->msgLen);
*(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode); *(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode);
......
...@@ -223,6 +223,8 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -223,6 +223,8 @@ int vnodeCommit(SVnode *pVnode) {
vnodeBufPoolUnRef(pVnode->inUse); vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL; pVnode->inUse = NULL;
pVnode->state.commitTerm = pVnode->state.applyTerm;
// save info // save info
info.config = pVnode->config; info.config = pVnode->config;
info.state.committed = pVnode->state.applied; info.state.committed = pVnode->state.applied;
......
...@@ -273,6 +273,9 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -273,6 +273,9 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
} }
for (int32_t i = 0; i < msgNum; ++i) { for (int32_t i = 0; i < msgNum; ++i) {
req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgIdx);
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgType); offset += sizeof(req.msgType);
...@@ -301,6 +304,7 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -301,6 +304,7 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
break; break;
} }
rsp.msgIdx = req.msgIdx;
rsp.reqType = reqMsg.msgType; rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.contLen; rsp.msgLen = reqMsg.contLen;
rsp.rspCode = reqMsg.code; rsp.rspCode = reqMsg.code;
...@@ -327,6 +331,8 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -327,6 +331,8 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
*(int32_t *)((char *)pRsp + offset) = htonl(p->reqType); *(int32_t *)((char *)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType); offset += sizeof(p->reqType);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgIdx);
offset += sizeof(p->msgIdx);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen); *(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen); offset += sizeof(p->msgLen);
*(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode); *(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode);
......
...@@ -32,6 +32,7 @@ extern "C" { ...@@ -32,6 +32,7 @@ extern "C" {
#define CTG_DEFAULT_RENT_SLOT_SIZE 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_DEFAULT_MAX_RETRY_TIMES 3 #define CTG_DEFAULT_MAX_RETRY_TIMES 3
#define CTG_DEFAULT_BATCH_NUM 64 #define CTG_DEFAULT_BATCH_NUM 64
#define CTG_DEFAULT_FETCH_NUM 8
#define CTG_RENT_SLOT_SECOND 1.5 #define CTG_RENT_SLOT_SECOND 1.5
...@@ -80,6 +81,8 @@ typedef enum { ...@@ -80,6 +81,8 @@ typedef enum {
CTG_TASK_GET_UDF, CTG_TASK_GET_UDF,
CTG_TASK_GET_USER, CTG_TASK_GET_USER,
CTG_TASK_GET_SVR_VER, CTG_TASK_GET_SVR_VER,
CTG_TASK_GET_TB_META_BATCH,
CTG_TASK_GET_TB_HASH_BATCH,
} CTG_TASK_TYPE; } CTG_TASK_TYPE;
typedef enum { typedef enum {
...@@ -110,6 +113,23 @@ typedef struct SCtgTbMetaCtx { ...@@ -110,6 +113,23 @@ typedef struct SCtgTbMetaCtx {
int32_t flag; int32_t flag;
} SCtgTbMetaCtx; } SCtgTbMetaCtx;
typedef struct SCtgFetch {
int32_t dbIdx;
int32_t tbIdx;
int32_t fetchIdx;
int32_t resIdx;
int32_t flag;
SCtgTbCacheInfo tbInfo;
int32_t vgId;
} SCtgFetch;
typedef struct SCtgTbMetasCtx {
int32_t fetchNum;
SArray* pNames;
SArray* pResList;
SArray* pFetchs;
} SCtgTbMetasCtx;
typedef struct SCtgTbIndexCtx { typedef struct SCtgTbIndexCtx {
SName* pName; SName* pName;
} SCtgTbIndexCtx; } SCtgTbIndexCtx;
...@@ -137,6 +157,14 @@ typedef struct SCtgTbHashCtx { ...@@ -137,6 +157,14 @@ typedef struct SCtgTbHashCtx {
SName* pName; SName* pName;
} SCtgTbHashCtx; } SCtgTbHashCtx;
typedef struct SCtgTbHashsCtx {
int32_t fetchNum;
SArray* pNames;
SArray* pResList;
SArray* pFetchs;
} SCtgTbHashsCtx;
typedef struct SCtgIndexCtx { typedef struct SCtgIndexCtx {
char indexFName[TSDB_INDEX_FNAME_LEN]; char indexFName[TSDB_INDEX_FNAME_LEN];
} SCtgIndexCtx; } SCtgIndexCtx;
...@@ -211,6 +239,7 @@ typedef struct SCtgBatch { ...@@ -211,6 +239,7 @@ typedef struct SCtgBatch {
SRequestConnInfo conn; SRequestConnInfo conn;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
SArray* pTaskIds; SArray* pTaskIds;
SArray* pMsgIdxs;
} SCtgBatch; } SCtgBatch;
typedef struct SCtgJob { typedef struct SCtgJob {
...@@ -218,6 +247,7 @@ typedef struct SCtgJob { ...@@ -218,6 +247,7 @@ typedef struct SCtgJob {
int32_t batchId; int32_t batchId;
SHashObj* pBatchs; SHashObj* pBatchs;
SArray* pTasks; SArray* pTasks;
int32_t subTaskNum;
int32_t taskDone; int32_t taskDone;
SMetaData jobRes; SMetaData jobRes;
int32_t jobResCode; int32_t jobResCode;
...@@ -258,6 +288,7 @@ typedef struct SCtgTaskCallbackParam { ...@@ -258,6 +288,7 @@ typedef struct SCtgTaskCallbackParam {
SArray* taskId; SArray* taskId;
int32_t reqType; int32_t reqType;
int32_t batchId; int32_t batchId;
SArray* msgIdx;
} SCtgTaskCallbackParam; } SCtgTaskCallbackParam;
...@@ -276,6 +307,7 @@ typedef struct SCtgTask { ...@@ -276,6 +307,7 @@ typedef struct SCtgTask {
int32_t taskId; int32_t taskId;
SCtgJob* pJob; SCtgJob* pJob;
void* taskCtx; void* taskCtx;
SArray* msgCtxs;
SCtgMsgCtx msgCtx; SCtgMsgCtx msgCtx;
int32_t code; int32_t code;
void* res; void* res;
...@@ -286,9 +318,14 @@ typedef struct SCtgTask { ...@@ -286,9 +318,14 @@ typedef struct SCtgTask {
SHashObj* pBatchs; SHashObj* pBatchs;
} SCtgTask; } SCtgTask;
typedef struct SCtgTaskReq {
SCtgTask* pTask;
int32_t msgIdx;
} SCtgTaskReq;
typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*); typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*);
typedef int32_t (*ctgLanchTaskFp)(SCtgTask*); typedef int32_t (*ctgLanchTaskFp)(SCtgTask*);
typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTask*, int32_t, const SDataBuf *, int32_t); typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTaskReq*, int32_t, const SDataBuf *, int32_t);
typedef int32_t (*ctgDumpTaskResFp)(SCtgTask*); typedef int32_t (*ctgDumpTaskResFp)(SCtgTask*);
typedef int32_t (*ctgCloneTaskResFp)(SCtgTask*, void**); typedef int32_t (*ctgCloneTaskResFp)(SCtgTask*, void**);
typedef int32_t (*ctgCompTaskFp)(SCtgTask*, void*, bool*); typedef int32_t (*ctgCompTaskFp)(SCtgTask*, void*, bool*);
...@@ -487,6 +524,8 @@ typedef struct SCtgOperation { ...@@ -487,6 +524,8 @@ typedef struct SCtgOperation {
#define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)) #define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB))
#define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) #define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_GET_TASK_MSGCTX(_task, _id) (((CTG_TASK_GET_TB_META_BATCH == (_task)->type) || (CTG_TASK_GET_TB_HASH_BATCH == (_task)->type)) ? taosArrayGet((_task)->msgCtxs, (_id)) : &(_task)->msgCtx)
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) #define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST) #define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
...@@ -586,6 +625,7 @@ int32_t ctgdShowCacheInfo(void); ...@@ -586,6 +625,7 @@ int32_t ctgdShowCacheInfo(void);
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq); int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx, int32_t *fetchIdx, int32_t baseResIdx, SArray* pList);
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action); int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action); int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
...@@ -631,7 +671,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg ...@@ -631,7 +671,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target); int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target);
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask); int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTaskReq* tReq);
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask); int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask);
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask); int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask); int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
...@@ -639,9 +679,9 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ...@@ -639,9 +679,9 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask); int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask); int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask); int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTaskReq* tReq);
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTaskReq* tReq);
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTaskReq* tReq);
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask); int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask); int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask); int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask);
...@@ -664,6 +704,7 @@ void ctgFreeJob(void* job); ...@@ -664,6 +704,7 @@ void ctgFreeJob(void* job);
void ctgFreeHandleImpl(SCatalog* pCtg); void ctgFreeHandleImpl(SCatalog* pCtg);
void ctgFreeVgInfo(SDBVgInfo *vgInfo); void ctgFreeVgInfo(SDBVgInfo *vgInfo);
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup); int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup);
int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update);
void ctgResetTbMetaTask(SCtgTask* pTask); void ctgResetTbMetaTask(SCtgTask* pTask);
void ctgFreeDbCache(SCtgDBCache *dbCache); void ctgFreeDbCache(SCtgDBCache *dbCache);
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2); int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
...@@ -672,8 +713,11 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2); ...@@ -672,8 +713,11 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2); int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput); void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target); int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target);
char * ctgTaskTypeStr(CTG_TASK_TYPE type); char * ctgTaskTypeStr(CTG_TASK_TYPE type);
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId); int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId);
int32_t ctgGetTablesReqNum(SArray *pList);
int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t *fetchIdx, int32_t resIdx, int32_t flag);
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes); int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
void ctgFreeSTableIndex(void *info); void ctgFreeSTableIndex(void *info);
void ctgClearSubTaskRes(SCtgSubRes *pRes); void ctgClearSubTaskRes(SCtgSubRes *pRes);
...@@ -682,6 +726,7 @@ void ctgClearHandle(SCatalog* pCtg); ...@@ -682,6 +726,7 @@ void ctgClearHandle(SCatalog* pCtg);
void ctgFreeTbCacheImpl(SCtgTbCache *pCache); void ctgFreeTbCacheImpl(SCtgTbCache *pCache);
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName); int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
int32_t ctgGetTbHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup); int32_t ctgGetTbHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup);
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
extern SCatalogMgmt gCtgMgmt; extern SCatalogMgmt gCtgMgmt;
......
此差异已折叠。
...@@ -242,7 +242,6 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, S ...@@ -242,7 +242,6 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, S
goto _return; goto _return;
} }
int32_t sz = 0;
pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName)); pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == pCache) { if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName); ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
...@@ -282,7 +281,6 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid, ...@@ -282,7 +281,6 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid,
goto _return; goto _return;
} }
int32_t sz = 0;
char* stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid)); char* stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
if (NULL == stName) { if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName); ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
...@@ -2152,6 +2150,254 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet ...@@ -2152,6 +2150,254 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, SArray** pResList) {
int32_t tbNum = taosArrayGetSize(ctx->pNames);
SName* fName = taosArrayGet(ctx->pNames, 0);
int32_t fIdx = 0;
for (int32_t i = 0; i < tbNum; ++i) {
SName* pName = taosArrayGet(ctx->pNames, i);
SCtgTbMetaCtx nctx = {0};
nctx.flag = CTG_FLAG_UNKNOWN_STB;
nctx.pName = pName;
if (IS_SYS_DBNAME(pName->dbname)) {
CTG_FLAG_SET_SYS_DB(nctx.flag);
}
STableMeta *pTableMeta = NULL;
CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, &nctx, &pTableMeta));
SMetaRes res = {0};
if (pTableMeta) {
if (CTG_FLAG_MATCH_STB(nctx.flag, pTableMeta->tableType) &&
((!CTG_FLAG_IS_FORCE_UPDATE(nctx.flag)) || (CTG_FLAG_IS_SYS_DB(nctx.flag)))) {
res.pRes = pTableMeta;
} else {
taosMemoryFreeClear(pTableMeta);
}
}
if (NULL == res.pRes) {
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
if (CTG_FLAG_IS_UNKNOWN_STB(nctx.flag)) {
CTG_FLAG_SET_STB(nctx.flag, nctx.tbInfo.tbType);
}
SCtgFetch fetch = {0};
fetch.tbIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = nctx.flag;
taosArrayPush(ctx->pFetchs, &fetch);
}
taosArrayPush(ctx->pResList, &res);
}
if (NULL == ctx->pFetchs) {
TSWAP(*pResList, ctx->pResList);
}
return TSDB_CODE_SUCCESS;
}
#endif
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx, int32_t *fetchIdx, int32_t baseResIdx, SArray* pList) {
int32_t tbNum = taosArrayGetSize(pList);
SName* pName = taosArrayGet(pList, 0);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t flag = CTG_FLAG_UNKNOWN_STB;
uint64_t lastSuid = 0;
STableMeta* lastTableMeta = NULL;
if (IS_SYS_DBNAME(pName->dbname)) {
CTG_FLAG_SET_SYS_DB(flag);
strcpy(dbFName, pName->dbname);
} else {
tNameGetFullDbName(pName, dbFName);
}
SCtgDBCache *dbCache = NULL;
SCtgTbCache* pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
for (int32_t i = 0; i < tbNum; ++i) {
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
}
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < tbNum; ++i) {
SName* pName = taosArrayGet(pList, i);
pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
continue;
}
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
continue;
}
STableMeta* tbMeta = pCache->pMeta;
SCtgTbMetaCtx nctx = {0};
nctx.flag = flag;
nctx.tbInfo.inCache = true;
nctx.tbInfo.dbId = dbCache->dbId;
nctx.tbInfo.suid = tbMeta->suid;
nctx.tbInfo.tbType = tbMeta->tableType;
SMetaRes res = {0};
STableMeta* pTableMeta = NULL;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
int32_t metaSize = CTG_META_SIZE(tbMeta);
pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pTableMeta, tbMeta, metaSize);
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
res.pRes = pTableMeta;
taosArrayPush(ctx->pResList, &res);
continue;
}
// PROCESS FOR CHILD TABLE
if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
cloneTableMeta(lastTableMeta, &pTableMeta);
memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
res.pRes = pTableMeta;
taosArrayPush(ctx->pResList, &res);
continue;
}
int32_t metaSize = sizeof(SCTableMeta);
pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pTableMeta, tbMeta, metaSize);
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s",
pName->tname, nctx.tbInfo.tbType, dbFName);
char* stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
continue;
}
pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
if (NULL == pCache) {
ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
taosHashRelease(dbCache->stbCache, stName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
continue;
}
taosHashRelease(dbCache->stbCache, stName);
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
continue;
}
STableMeta* stbMeta = pCache->pMeta;
if (stbMeta->suid != nctx.tbInfo.suid) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, nctx.tbInfo.suid);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
continue;
}
metaSize = CTG_META_SIZE(stbMeta);
pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
if (NULL == pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
res.pRes = pTableMeta;
taosArrayPush(ctx->pResList, &res);
lastSuid = pTableMeta->suid;
lastTableMeta = pTableMeta;
}
ctgReleaseDBCache(pCtg, dbCache);
return TSDB_CODE_SUCCESS;
}
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) { int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) {
int32_t code = 0; int32_t code = 0;
......
此差异已折叠。
...@@ -26,6 +26,7 @@ void ctgFreeMsgSendParam(void* param) { ...@@ -26,6 +26,7 @@ void ctgFreeMsgSendParam(void* param) {
SCtgTaskCallbackParam* pParam = (SCtgTaskCallbackParam*)param; SCtgTaskCallbackParam* pParam = (SCtgTaskCallbackParam*)param;
taosArrayDestroy(pParam->taskId); taosArrayDestroy(pParam->taskId);
taosArrayDestroy(pParam->msgIdx);
taosMemoryFree(param); taosMemoryFree(param);
} }
...@@ -88,6 +89,10 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) { ...@@ -88,6 +89,10 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
return "[get user]"; return "[get user]";
case CTG_TASK_GET_SVR_VER: case CTG_TASK_GET_SVR_VER:
return "[get svr ver]"; return "[get svr ver]";
case CTG_TASK_GET_TB_META_BATCH:
return "[bget table meta]";
case CTG_TASK_GET_TB_HASH_BATCH:
return "[bget table hash]";
default: default:
return "unknown"; return "unknown";
} }
...@@ -460,6 +465,25 @@ void ctgResetTbMetaTask(SCtgTask* pTask) { ...@@ -460,6 +465,25 @@ void ctgResetTbMetaTask(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->res); taosMemoryFreeClear(pTask->res);
} }
void ctgFreeBatchMeta(void* meta) {
if (NULL == meta) {
return;
}
SMetaRes* pRes = (SMetaRes*)meta;
taosMemoryFreeClear(pRes->pRes);
}
void ctgFreeBatchHash(void* hash) {
if (NULL == hash) {
return;
}
SMetaRes* pRes = (SMetaRes*)hash;
taosMemoryFreeClear(pRes->pRes);
}
void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
switch (type) { switch (type) {
case CTG_TASK_GET_QNODE: case CTG_TASK_GET_QNODE:
...@@ -500,6 +524,24 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { ...@@ -500,6 +524,24 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
taosMemoryFreeClear(*pRes); taosMemoryFreeClear(*pRes);
break; break;
} }
case CTG_TASK_GET_TB_META_BATCH: {
SArray* pArray = (SArray*)*pRes;
int32_t num = taosArrayGetSize(pArray);
for (int32_t i = 0; i < num; ++i) {
ctgFreeBatchMeta(taosArrayGet(pArray, i));
}
*pRes = NULL; // no need to free it
break;
}
case CTG_TASK_GET_TB_HASH_BATCH: {
SArray* pArray = (SArray*)*pRes;
int32_t num = taosArrayGetSize(pArray);
for (int32_t i = 0; i < num; ++i) {
ctgFreeBatchHash(taosArrayGet(pArray, i));
}
*pRes = NULL; // no need to free it
break;
}
default: default:
qError("invalid task type %d", type); qError("invalid task type %d", type);
break; break;
...@@ -554,6 +596,16 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) { ...@@ -554,6 +596,16 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
taosMemoryFreeClear(*pRes); taosMemoryFreeClear(*pRes);
break; break;
} }
case CTG_TASK_GET_TB_META_BATCH: {
taosArrayDestroyEx(*pRes, ctgFreeBatchMeta);
*pRes = NULL;
break;
}
case CTG_TASK_GET_TB_HASH_BATCH: {
taosArrayDestroyEx(*pRes, ctgFreeBatchHash);
*pRes = NULL;
break;
}
default: default:
qError("invalid task type %d", type); qError("invalid task type %d", type);
break; break;
...@@ -583,12 +635,38 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { ...@@ -583,12 +635,38 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->taskCtx); taosMemoryFreeClear(pTask->taskCtx);
break; break;
} }
case CTG_TASK_GET_TB_META_BATCH: {
SCtgTbMetasCtx* taskCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchMeta);
taosArrayDestroy(taskCtx->pFetchs);
// NO NEED TO FREE pNames
taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx);
if (pTask->msgCtx.lastOut) {
ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut);
pTask->msgCtx.lastOut = NULL;
}
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_HASH: { case CTG_TASK_GET_TB_HASH: {
SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx; SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName); taosMemoryFreeClear(taskCtx->pName);
taosMemoryFreeClear(pTask->taskCtx); taosMemoryFreeClear(pTask->taskCtx);
break; break;
} }
case CTG_TASK_GET_TB_HASH_BATCH: {
SCtgTbHashsCtx* taskCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchHash);
taosArrayDestroy(taskCtx->pFetchs);
// NO NEED TO FREE pNames
taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx);
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_INDEX: { case CTG_TASK_GET_TB_INDEX: {
SCtgTbIndexCtx* taskCtx = (SCtgTbIndexCtx*)pTask->taskCtx; SCtgTbIndexCtx* taskCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName); taosMemoryFreeClear(taskCtx->pName);
...@@ -679,6 +757,23 @@ int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* targ ...@@ -679,6 +757,23 @@ int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* targ
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target) {
SCtgMsgCtx ctx = {0};
ctx.reqType = reqType;
ctx.out = out;
if (target) {
ctx.target = strdup(target);
if (NULL == ctx.target) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
taosArrayPush(pCtxs, &ctx);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
switch (hashMethod) { switch (hashMethod) {
...@@ -780,6 +875,104 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName ...@@ -780,6 +875,104 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update) {
int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SMetaRes res = {0};
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
if (vgNum <= 0) {
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
}
tableNameHashFp fp = NULL;
SVgroupInfo *vgInfo = NULL;
CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
int32_t tbNum = taosArrayGetSize(pNames);
if (1 == vgNum) {
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
for (int32_t i = 0; i < tbNum; ++i) {
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
if (NULL == vgInfo) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
*vgInfo = *(SVgroupInfo*)pIter;
ctgDebug("Got tb hash vgroup, vgId:%d, epNum %d, current %s port %d", vgInfo->vgId, vgInfo->epSet.numOfEps,
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
if (update) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, tReq->msgIdx);
SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i);
pRes->pRes = vgInfo;
} else {
res.pRes = vgInfo;
taosArrayPush(pCtx->pResList, &res);
}
}
return TSDB_CODE_SUCCESS;
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFullName, "%s.", dbFName);
int32_t offset = strlen(tbFullName);
SName* pName = NULL;
int32_t tbNameLen = 0;
for (int32_t i = 0; i < tbNum; ++i) {
pName = taosArrayGet(pNames, i);
tbNameLen = offset + strlen(pName->tname);
strcpy(tbFullName + offset, pName->tname);
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)tbNameLen);
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) {
vgInfo = pIter;
if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
taosHashCancelIterate(dbInfo->vgHash, pIter);
break;
}
pIter = taosHashIterate(dbInfo->vgHash, pIter);
vgInfo = NULL;
}
if (NULL == vgInfo) {
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo));
if (NULL == pNewVg) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
*pNewVg = *vgInfo;
ctgDebug("Got tb %s hash vgroup, vgId:%d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId, vgInfo->epSet.numOfEps,
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
if (update) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, tReq->msgIdx);
SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i);
pRes->pRes = pNewVg;
} else {
res.pRes = pNewVg;
taosArrayPush(pCtx->pResList, &res);
}
}
CTG_RET(code);
}
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) { int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
if (*(uint64_t *)key1 < ((SSTableVersion*)key2)->suid) { if (*(uint64_t *)key1 < ((SSTableVersion*)key2)->suid) {
return -1; return -1;
...@@ -921,4 +1114,41 @@ int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, cha ...@@ -921,4 +1114,41 @@ int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, cha
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTablesReqNum(SArray *pList) {
if (NULL == pList) {
return 0;
}
int32_t total = 0;
int32_t n = taosArrayGetSize(pList);
for (int32_t i = 0; i < n; ++i) {
STablesReq *pReq = taosArrayGet(pList, i);
total += taosArrayGetSize(pReq->pTables);
}
return total;
}
int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t *fetchIdx, int32_t resIdx, int32_t flag) {
if (NULL == (*pFetchs)) {
*pFetchs = taosArrayInit(CTG_DEFAULT_FETCH_NUM, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.dbIdx = dbIdx;
fetch.tbIdx = tbIdx;
fetch.fetchIdx = (*fetchIdx)++;
fetch.resIdx = resIdx;
fetch.flag = flag;
taosArrayPush(*pFetchs, &fetch);
return TSDB_CODE_SUCCESS;
}
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) {
STablesReq* pReq = (STablesReq*)taosArrayGet(pNames, pFetch->dbIdx);
return (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx);
}
...@@ -737,6 +737,9 @@ typedef struct STimeSliceOperatorInfo { ...@@ -737,6 +737,9 @@ typedef struct STimeSliceOperatorInfo {
SInterval interval; SInterval interval;
int64_t current; int64_t current;
SArray* pPrevRow; // SArray<SGroupValue> SArray* pPrevRow; // SArray<SGroupValue>
SArray* pNextRow; // SArray<SGroupValue>
bool isPrevRowSet;
bool isNextRowSet;
int32_t fillType; // fill type int32_t fillType; // fill type
SColumn tsCol; // primary timestamp column SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation SExprSupp scalarSup; // scalar calculation
...@@ -987,7 +990,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length); ...@@ -987,7 +990,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
const char* sql, EOPTR_EXEC_MODEL model); char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
......
...@@ -341,7 +341,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table ...@@ -341,7 +341,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
} }
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) { qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
assert(pSubplan != NULL); assert(pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
......
...@@ -4492,7 +4492,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT ...@@ -4492,7 +4492,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
} }
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
const char* sql, EOPTR_EXEC_MODEL model) { char* sql, EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId; uint64_t queryId = pPlan->id.queryId;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -4503,6 +4503,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4503,6 +4503,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
} }
(*pTaskInfo)->sql = sql; (*pTaskInfo)->sql = sql;
sql = NULL;
(*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList,
pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
...@@ -4515,6 +4516,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4515,6 +4516,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
return code; return code;
_complete: _complete:
taosMemoryFree(sql);
doDestroyTask(*pTaskInfo); doDestroyTask(*pTaskInfo);
terrno = code; terrno = code;
return code; return code;
......
...@@ -2065,10 +2065,30 @@ static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock ...@@ -2065,10 +2065,30 @@ static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock
memcpy(pkey->pData, val, pkey->bytes); memcpy(pkey->pData, val, pkey->bytes);
} }
} }
pSliceInfo->isPrevRowSet = true;
}
static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
// null data should not be kept since it can not be used to perform interpolation
if (!colDataIsNull_s(pColInfoData, i)) {
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, i);
pkey->isNull = false;
char* val = colDataGetData(pColInfoData, rowIndex);
memcpy(pkey->pData, val, pkey->bytes);
}
}
pSliceInfo->isNextRowSet = true;
} }
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pBlock, static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pBlock,
int32_t rowIndex, SSDataBlock* pResBlock) { SSDataBlock* pResBlock) {
int32_t rows = pResBlock->info.rows; int32_t rows = pResBlock->info.rows;
// todo set the correct primary timestamp column // todo set the correct primary timestamp column
...@@ -2144,6 +2164,10 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2144,6 +2164,10 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break; break;
} }
case TSDB_FILL_PREV: { case TSDB_FILL_PREV: {
if (!pSliceInfo->isPrevRowSet) {
break;
}
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
colDataAppend(pDst, rows, pkey->pData, false); colDataAppend(pDst, rows, pkey->pData, false);
pResBlock->info.rows += 1; pResBlock->info.rows += 1;
...@@ -2151,8 +2175,12 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2151,8 +2175,12 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
} }
case TSDB_FILL_NEXT: { case TSDB_FILL_NEXT: {
char* p = colDataGetData(pSrc, rowIndex); if (!pSliceInfo->isNextRowSet) {
colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex)); break;
}
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot);
colDataAppend(pDst, rows, pkey->pData, false);
pResBlock->info.rows += 1; pResBlock->info.rows += 1;
break; break;
} }
...@@ -2186,6 +2214,35 @@ static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB ...@@ -2186,6 +2214,35 @@ static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
taosArrayPush(pInfo->pPrevRow, &key); taosArrayPush(pInfo->pPrevRow, &key);
} }
pInfo->isPrevRowSet = false;
return TSDB_CODE_SUCCESS;
}
static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
if (pInfo->pNextRow != NULL) {
return TSDB_CODE_SUCCESS;
}
pInfo->pNextRow = taosArrayInit(4, sizeof(SGroupKeys));
if (pInfo->pNextRow == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SGroupKeys key = {0};
key.bytes = pColInfo->info.bytes;
key.type = pColInfo->info.type;
key.isNull = false;
key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
taosArrayPush(pInfo->pNextRow, &key);
}
pInfo->isNextRowSet = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2215,14 +2272,19 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2215,14 +2272,19 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
blockDataCleanup(pResBlock); blockDataCleanup(pResBlock);
//int32_t numOfRows = 0;
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
int32_t code = initPrevRowsKeeper(pSliceInfo, pBlock); int32_t code;
code = initPrevRowsKeeper(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
code = initNextRowsKeeper(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
...@@ -2244,7 +2306,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2244,7 +2306,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
char* v = colDataGetData(pSrc, i); char* v = colDataGetData(pSrc, i);
//colDataAppend(pDst, numOfRows, v, false);
colDataAppend(pDst, pResBlock->info.rows, v, false); colDataAppend(pDst, pResBlock->info.rows, v, false);
} }
...@@ -2262,11 +2323,16 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2262,11 +2323,16 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break; break;
} }
} else if (ts < pSliceInfo->current) { } else if (ts < pSliceInfo->current) {
// in case interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
doKeepPrevRows(pSliceInfo, pBlock, i);
if (i < pBlock->info.rows - 1) { if (i < pBlock->info.rows - 1) {
// in case interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i + 1);
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) { if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
...@@ -2285,8 +2351,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2285,8 +2351,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
doKeepPrevRows(pSliceInfo, pBlock, i); doKeepPrevRows(pSliceInfo, pBlock, i);
} }
} else { // ts > pSliceInfo->current } else { // ts > pSliceInfo->current
// in case interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
...@@ -2326,9 +2395,10 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2326,9 +2395,10 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
} }
//check if need to interpolate after ts range // check if need to interpolate after ts range
while (pSliceInfo->current <= pSliceInfo->win.ekey) { // except for fill(next)
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pBlock->info.rows - 1, pResBlock); while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
......
...@@ -39,6 +39,11 @@ typedef struct SMsgBuf { ...@@ -39,6 +39,11 @@ typedef struct SMsgBuf {
char* buf; char* buf;
} SMsgBuf; } SMsgBuf;
typedef struct SParseTablesMetaReq {
char dbFName[TSDB_DB_FNAME_LEN];
SHashObj* pTables;
} SParseTablesMetaReq;
typedef struct SParseMetaCache { typedef struct SParseMetaCache {
SHashObj* pTableMeta; // key is tbFName, element is STableMeta* SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>* SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
...@@ -95,7 +100,7 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun ...@@ -95,7 +100,7 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes); int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput); int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes); int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
void destoryParseMetaCache(SParseMetaCache* pMetaCache); void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -1253,20 +1253,21 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p ...@@ -1253,20 +1253,21 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p
if (!fmIsRepeatScanFunc(pFunc->funcId)) { if (!fmIsRepeatScanFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (isSelectStmt(pCxt->pCurrStmt)) { if (!isSelectStmt(pCxt->pCurrStmt)) {
// select percentile() without from clause is also valid return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
if (NULL == ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable) { "%s is only supported in single table query", pFunc->functionName);
return TSDB_CODE_SUCCESS;
}
SNode* pTable = ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable;
if (QUERY_NODE_REAL_TABLE == nodeType(pTable) &&
(TSDB_CHILD_TABLE == ((SRealTableNode*)pTable)->pMeta->tableType ||
TSDB_NORMAL_TABLE == ((SRealTableNode*)pTable)->pMeta->tableType)) {
return TSDB_CODE_SUCCESS;
}
} }
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
SNode* pTable = pSelect->pFromTable;
// select percentile() without from clause is also valid
if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) ||
(TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) ||
NULL != pSelect->pPartitionByList) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
"%s is only supported in single table query", pFunc->functionName); "%s is only supported in single table query", pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
} }
static bool isStar(SNode* pNode) { static bool isStar(SNode* pNode) {
...@@ -2509,9 +2510,31 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) { ...@@ -2509,9 +2510,31 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
static int32_t translateStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) { static bool isPartitionByTbname(SNodeList* pPartitionByList) {
if (1 != LIST_LENGTH(pPartitionByList)) {
return false;
}
SNode* pPartKey = nodesListGetNode(pPartitionByList, 0);
return QUERY_NODE_FUNCTION != nodeType(pPartKey) || FUNCTION_TYPE_TBNAME != ((SFunctionNode*)pPartKey)->funcType;
}
static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pCxt->createStream) {
return TSDB_CODE_SUCCESS;
}
if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!isPartitionByTbname(pSelect->pPartitionByList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateStateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
SStateWindowNode* pState = (SStateWindowNode*)pSelect->pWindow;
nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt); nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt);
// todo check for "function not support for state_window" if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = checkStateWindowForStream(pCxt, pSelect);
}
return pCxt->errCode; return pCxt->errCode;
} }
...@@ -2522,14 +2545,13 @@ static int32_t translateSessionWindow(STranslateContext* pCxt, SSessionWindowNod ...@@ -2522,14 +2545,13 @@ static int32_t translateSessionWindow(STranslateContext* pCxt, SSessionWindowNod
if (PRIMARYKEY_TIMESTAMP_COL_ID != pSession->pCol->colId) { if (PRIMARYKEY_TIMESTAMP_COL_ID != pSession->pCol->colId) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SESSION_COL); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SESSION_COL);
} }
// todo check for "function not support for session"
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
switch (nodeType(pSelect->pWindow)) { switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_STATE_WINDOW: case QUERY_NODE_STATE_WINDOW:
return translateStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow); return translateStateWindow(pCxt, pSelect);
case QUERY_NODE_SESSION_WINDOW: case QUERY_NODE_SESSION_WINDOW:
return translateSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow); return translateSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow);
case QUERY_NODE_INTERVAL_WINDOW: case QUERY_NODE_INTERVAL_WINDOW:
...@@ -4708,7 +4730,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt ...@@ -4708,7 +4730,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
} }
} }
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
} }
static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) { static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
......
...@@ -474,6 +474,24 @@ static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) { ...@@ -474,6 +474,24 @@ static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t buildTableReqFromDb(SHashObj* pDbsHash, SArray** pDbs) {
if (NULL != pDbsHash) {
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq));
if (NULL == *pDbs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SParseTablesMetaReq* p = taosHashIterate(pDbsHash, NULL);
while (NULL != p) {
STablesReq req = {0};
strcpy(req.dbFName, p->dbFName);
buildTableReq(p->pTables, &req.pTables);
taosArrayPush(*pDbs, &req);
p = taosHashIterate(pDbsHash, p);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildUserAuthReq(SHashObj* pUserAuthHash, SArray** pUserAuth) { static int32_t buildUserAuthReq(SHashObj* pUserAuthHash, SArray** pUserAuth) {
if (NULL != pUserAuthHash) { if (NULL != pUserAuthHash) {
*pUserAuth = taosArrayInit(taosHashGetSize(pUserAuthHash), sizeof(SUserAuthInfo)); *pUserAuth = taosArrayInit(taosHashGetSize(pUserAuthHash), sizeof(SUserAuthInfo));
...@@ -513,12 +531,12 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) { ...@@ -513,12 +531,12 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
} }
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) { int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
int32_t code = buildTableReq(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta); int32_t code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup); code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildTableReq(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash); code = buildTableReqFromDb(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildDbReq(pMetaCache->pDbCfg, &pCatalogReq->pDbCfg); code = buildDbReq(pMetaCache->pDbCfg, &pCatalogReq->pDbCfg);
...@@ -587,6 +605,24 @@ static int32_t putDbDataToCache(const SArray* pDbReq, const SArray* pDbData, SHa ...@@ -587,6 +605,24 @@ static int32_t putDbDataToCache(const SArray* pDbReq, const SArray* pDbData, SHa
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t putDbTableDataToCache(const SArray* pDbReq, const SArray* pTableData, SHashObj** pTable) {
int32_t ndbs = taosArrayGetSize(pDbReq);
int32_t tableNo = 0;
for (int32_t i = 0; i < ndbs; ++i) {
STablesReq* pReq = taosArrayGet(pDbReq, i);
int32_t ntables = taosArrayGetSize(pReq->pTables);
for (int32_t j = 0; j < ntables; ++j) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(taosArrayGet(pReq->pTables, j), fullName);
if (TSDB_CODE_SUCCESS != putMetaDataToHash(fullName, strlen(fullName), pTableData, tableNo, pTable)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
++tableNo;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUserAuthData, SHashObj** pUserAuth) { static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUserAuthData, SHashObj** pUserAuth) {
int32_t nvgs = taosArrayGetSize(pUserAuthReq); int32_t nvgs = taosArrayGetSize(pUserAuthReq);
for (int32_t i = 0; i < nvgs; ++i) { for (int32_t i = 0; i < nvgs; ++i) {
...@@ -612,12 +648,12 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas ...@@ -612,12 +648,12 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas
} }
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) { int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
int32_t code = putTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta); int32_t code = putDbTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup); code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = putTableDataToCache(pCatalogReq->pTableHash, pMetaData->pTableHash, &pMetaCache->pTableVgroup); code = putDbTableDataToCache(pCatalogReq->pTableHash, pMetaData->pTableHash, &pMetaCache->pTableVgroup);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = putDbDataToCache(pCatalogReq->pDbCfg, pMetaData->pDbCfg, &pMetaCache->pDbCfg); code = putDbDataToCache(pCatalogReq->pDbCfg, pMetaData->pDbCfg, &pMetaCache->pDbCfg);
...@@ -657,14 +693,38 @@ static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const cha ...@@ -657,14 +693,38 @@ static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const cha
return reserveTableReqInCacheImpl(fullName, len, pTables); return reserveTableReqInCacheImpl(fullName, len, pTables);
} }
static int32_t reserveTableReqInDbCacheImpl(int32_t acctId, const char* pDb, const char* pTable, SHashObj* pDbs) {
SParseTablesMetaReq req = {0};
int32_t len = snprintf(req.dbFName, sizeof(req.dbFName), "%d.%s", acctId, pDb);
int32_t code = reserveTableReqInCache(acctId, pDb, pTable, &req.pTables);
if (TSDB_CODE_SUCCESS == code) {
code = taosHashPut(pDbs, req.dbFName, len, &req, sizeof(SParseTablesMetaReq));
}
return code;
}
static int32_t reserveTableReqInDbCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pDbs) {
if (NULL == *pDbs) {
*pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == *pDbs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
char fullName[TSDB_DB_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s", acctId, pDb);
SParseTablesMetaReq* pReq = taosHashGet(*pDbs, fullName, len);
if (NULL == pReq) {
return reserveTableReqInDbCacheImpl(acctId, pDb, pTable, *pDbs);
}
return reserveTableReqInCache(acctId, pDb, pTable, &pReq->pTables);
}
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) { int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableMeta); return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pTableMeta);
} }
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) { int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
char fullName[TSDB_TABLE_FNAME_LEN]; return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableMeta);
tNameExtractFullName(pName, fullName);
return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableMeta);
} }
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) { int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
...@@ -711,13 +771,11 @@ int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, ...@@ -711,13 +771,11 @@ int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName,
} }
int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) { int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup); return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup);
} }
int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) { int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
char fullName[TSDB_TABLE_FNAME_LEN]; return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableVgroup);
tNameExtractFullName(pName, fullName);
return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableVgroup);
} }
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) { int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) {
...@@ -919,10 +977,24 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) { ...@@ -919,10 +977,24 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void destoryParseMetaCache(SParseMetaCache* pMetaCache) { void destoryParseTablesMetaReqHash(SHashObj* pHash) {
SParseTablesMetaReq* p = taosHashIterate(pHash, NULL);
while (NULL != p) {
taosHashCleanup(p->pTables);
p = taosHashIterate(pHash, p);
}
taosHashCleanup(pHash);
}
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
if (request) {
destoryParseTablesMetaReqHash(pMetaCache->pTableMeta);
destoryParseTablesMetaReqHash(pMetaCache->pTableVgroup);
} else {
taosHashCleanup(pMetaCache->pTableMeta); taosHashCleanup(pMetaCache->pTableMeta);
taosHashCleanup(pMetaCache->pDbVgroup);
taosHashCleanup(pMetaCache->pTableVgroup); taosHashCleanup(pMetaCache->pTableVgroup);
}
taosHashCleanup(pMetaCache->pDbVgroup);
taosHashCleanup(pMetaCache->pDbCfg); taosHashCleanup(pMetaCache->pDbCfg);
taosHashCleanup(pMetaCache->pDbInfo); taosHashCleanup(pMetaCache->pDbInfo);
taosHashCleanup(pMetaCache->pUserAuth); taosHashCleanup(pMetaCache->pUserAuth);
......
...@@ -187,7 +187,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq ...@@ -187,7 +187,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq(&metaCache, pCatalogReq); code = buildCatalogReq(&metaCache, pCatalogReq);
} }
destoryParseMetaCache(&metaCache); destoryParseMetaCache(&metaCache, true);
terrno = code; terrno = code;
return code; return code;
} }
...@@ -203,7 +203,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata ...@@ -203,7 +203,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
code = analyseSemantic(pCxt, pQuery, &metaCache); code = analyseSemantic(pCxt, pQuery, &metaCache);
} }
} }
destoryParseMetaCache(&metaCache); destoryParseMetaCache(&metaCache, false);
terrno = code; terrno = code;
return code; return code;
} }
......
...@@ -472,28 +472,36 @@ class MockCatalogServiceImpl { ...@@ -472,28 +472,36 @@ class MockCatalogServiceImpl {
int32_t getAllTableMeta(SArray* pTableMetaReq, SArray** pTableMetaData) const { int32_t getAllTableMeta(SArray* pTableMetaReq, SArray** pTableMetaData) const {
if (NULL != pTableMetaReq) { if (NULL != pTableMetaReq) {
int32_t ntables = taosArrayGetSize(pTableMetaReq); int32_t ndbs = taosArrayGetSize(pTableMetaReq);
*pTableMetaData = taosArrayInit(ntables, sizeof(SMetaRes)); *pTableMetaData = taosArrayInit(ndbs, sizeof(SMetaRes));
for (int32_t i = 0; i < ntables; ++i) { for (int32_t i = 0; i < ndbs; ++i) {
STablesReq* pReq = (STablesReq*)taosArrayGet(pTableMetaReq, i);
int32_t ntables = taosArrayGetSize(pReq->pTables);
for (int32_t j = 0; j < ntables; ++j) {
SMetaRes res = {0}; SMetaRes res = {0};
res.code = catalogGetTableMeta((const SName*)taosArrayGet(pTableMetaReq, i), (STableMeta**)&res.pRes); res.code = catalogGetTableMeta((const SName*)taosArrayGet(pReq->pTables, j), (STableMeta**)&res.pRes);
taosArrayPush(*pTableMetaData, &res); taosArrayPush(*pTableMetaData, &res);
} }
} }
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getAllTableVgroup(SArray* pTableVgroupReq, SArray** pTableVgroupData) const { int32_t getAllTableVgroup(SArray* pTableVgroupReq, SArray** pTableVgroupData) const {
if (NULL != pTableVgroupReq) { if (NULL != pTableVgroupReq) {
int32_t ntables = taosArrayGetSize(pTableVgroupReq); int32_t ndbs = taosArrayGetSize(pTableVgroupReq);
*pTableVgroupData = taosArrayInit(ntables, sizeof(SMetaRes)); *pTableVgroupData = taosArrayInit(ndbs, sizeof(SMetaRes));
for (int32_t i = 0; i < ntables; ++i) { for (int32_t i = 0; i < ndbs; ++i) {
STablesReq* pReq = (STablesReq*)taosArrayGet(pTableVgroupReq, i);
int32_t ntables = taosArrayGetSize(pReq->pTables);
for (int32_t j = 0; j < ntables; ++j) {
SMetaRes res = {0}; SMetaRes res = {0};
res.pRes = taosMemoryCalloc(1, sizeof(SVgroupInfo)); res.pRes = taosMemoryCalloc(1, sizeof(SVgroupInfo));
res.code = catalogGetTableHashVgroup((const SName*)taosArrayGet(pTableVgroupReq, i), (SVgroupInfo*)res.pRes); res.code = catalogGetTableHashVgroup((const SName*)taosArrayGet(pReq->pTables, j), (SVgroupInfo*)res.pRes);
taosArrayPush(*pTableVgroupData, &res); taosArrayPush(*pTableVgroupData, &res);
} }
} }
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -677,12 +685,17 @@ int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SM ...@@ -677,12 +685,17 @@ int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SM
return impl_->catalogGetAllMeta(pCatalogReq, pMetaData); return impl_->catalogGetAllMeta(pCatalogReq, pMetaData);
} }
void MockCatalogService::destoryTablesReq(void* p) {
STablesReq* pRes = (STablesReq*)p;
taosArrayDestroy(pRes->pTables);
}
void MockCatalogService::destoryCatalogReq(SCatalogReq* pReq) { void MockCatalogService::destoryCatalogReq(SCatalogReq* pReq) {
taosArrayDestroy(pReq->pDbVgroup); taosArrayDestroy(pReq->pDbVgroup);
taosArrayDestroy(pReq->pDbCfg); taosArrayDestroy(pReq->pDbCfg);
taosArrayDestroy(pReq->pDbInfo); taosArrayDestroy(pReq->pDbInfo);
taosArrayDestroy(pReq->pTableMeta); taosArrayDestroyEx(pReq->pTableMeta, destoryTablesReq);
taosArrayDestroy(pReq->pTableHash); taosArrayDestroyEx(pReq->pTableHash, destoryTablesReq);
taosArrayDestroy(pReq->pUdf); taosArrayDestroy(pReq->pUdf);
taosArrayDestroy(pReq->pIndex); taosArrayDestroy(pReq->pIndex);
taosArrayDestroy(pReq->pUser); taosArrayDestroy(pReq->pUser);
......
...@@ -50,6 +50,7 @@ struct MockTableMeta { ...@@ -50,6 +50,7 @@ struct MockTableMeta {
class MockCatalogServiceImpl; class MockCatalogServiceImpl;
class MockCatalogService { class MockCatalogService {
public: public:
static void destoryTablesReq(void* p);
static void destoryCatalogReq(SCatalogReq* pReq); static void destoryCatalogReq(SCatalogReq* pReq);
static void destoryMetaRes(void* p); static void destoryMetaRes(void* p);
static void destoryMetaArrayRes(void* p); static void destoryMetaArrayRes(void* p);
......
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <functional>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "mockCatalogService.h" #include "mockCatalogService.h"
...@@ -20,6 +22,7 @@ ...@@ -20,6 +22,7 @@
#include "parInt.h" #include "parInt.h"
using namespace std; using namespace std;
using namespace std::placeholders;
using namespace testing; using namespace testing;
namespace { namespace {
...@@ -63,7 +66,9 @@ class InsertTest : public Test { ...@@ -63,7 +66,9 @@ class InsertTest : public Test {
int32_t runAsync() { int32_t runAsync() {
cxt_.async = true; cxt_.async = true;
unique_ptr<SParseMetaCache, void (*)(SParseMetaCache*)> metaCache(new SParseMetaCache(), _destoryParseMetaCache); bool request = true;
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
new SParseMetaCache(), std::bind(_destoryParseMetaCache, _1, cref(request)));
code_ = parseInsertSyntax(&cxt_, &res_, metaCache.get()); code_ = parseInsertSyntax(&cxt_, &res_, metaCache.get());
if (code_ != TSDB_CODE_SUCCESS) { if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
...@@ -81,6 +86,8 @@ class InsertTest : public Test { ...@@ -81,6 +86,8 @@ class InsertTest : public Test {
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData); unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
g_mockCatalogService->catalogGetAllMeta(catalogReq.get(), metaData.get()); g_mockCatalogService->catalogGetAllMeta(catalogReq.get(), metaData.get());
metaCache.reset(new SParseMetaCache());
request = false;
code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get()); code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
if (code_ != TSDB_CODE_SUCCESS) { if (code_ != TSDB_CODE_SUCCESS) {
cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
...@@ -144,8 +151,8 @@ class InsertTest : public Test { ...@@ -144,8 +151,8 @@ class InsertTest : public Test {
static const int max_err_len = 1024; static const int max_err_len = 1024;
static const int max_sql_len = 1024 * 1024; static const int max_sql_len = 1024 * 1024;
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache) { static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
destoryParseMetaCache(pMetaCache); destoryParseMetaCache(pMetaCache, request);
delete pMetaCache; delete pMetaCache;
} }
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "parInt.h" #include "parInt.h"
using namespace std; using namespace std;
using namespace std::placeholders;
using namespace testing; using namespace testing;
namespace ParserTest { namespace ParserTest {
...@@ -118,8 +119,8 @@ class ParserTestBaseImpl { ...@@ -118,8 +119,8 @@ class ParserTestBaseImpl {
TEST_INTERFACE_ASYNC_API TEST_INTERFACE_ASYNC_API
}; };
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache) { static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
destoryParseMetaCache(pMetaCache); destoryParseMetaCache(pMetaCache, request);
delete pMetaCache; delete pMetaCache;
} }
...@@ -340,7 +341,9 @@ class ParserTestBaseImpl { ...@@ -340,7 +341,9 @@ class ParserTestBaseImpl {
doParse(&cxt, query.get()); doParse(&cxt, query.get());
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
unique_ptr<SParseMetaCache, void (*)(SParseMetaCache*)> metaCache(new SParseMetaCache(), _destoryParseMetaCache); bool request = true;
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request)));
doCollectMetaKey(&cxt, pQuery, metaCache.get()); doCollectMetaKey(&cxt, pQuery, metaCache.get());
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(), unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
...@@ -353,6 +356,8 @@ class ParserTestBaseImpl { ...@@ -353,6 +356,8 @@ class ParserTestBaseImpl {
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData); unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
doGetAllMeta(catalogReq.get(), metaData.get()); doGetAllMeta(catalogReq.get(), metaData.get());
metaCache.reset(new SParseMetaCache());
request = false;
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get()); doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
doAuthenticate(&cxt, pQuery, metaCache.get()); doAuthenticate(&cxt, pQuery, metaCache.get());
......
...@@ -85,7 +85,7 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown ...@@ -85,7 +85,7 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown
} }
int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) { int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.groupId, groupId); planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId);
return setSubplanExecutionNode(subplan->pNode, groupId, pSource); return setSubplanExecutionNode(subplan->pNode, groupId, pSource);
} }
...@@ -104,7 +104,10 @@ static void clearSubplanExecutionNode(SPhysiNode* pNode) { ...@@ -104,7 +104,10 @@ static void clearSubplanExecutionNode(SPhysiNode* pNode) {
FOREACH(pChild, pNode->pChildren) { clearSubplanExecutionNode((SPhysiNode*)pChild); } FOREACH(pChild, pNode->pChildren) { clearSubplanExecutionNode((SPhysiNode*)pChild); }
} }
void qClearSubplanExecutionNode(SSubplan* pSubplan) { clearSubplanExecutionNode(pSubplan->pNode); } void qClearSubplanExecutionNode(SSubplan* pSubplan) {
planDebug("QID:0x%" PRIx64 " clear subplan execution node, groupId:%d", pSubplan->id.queryId, pSubplan->id.groupId);
clearSubplanExecutionNode(pSubplan->pNode);
}
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) { int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) { if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
......
...@@ -462,3 +462,5 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { ...@@ -462,3 +462,5 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF); int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF);
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql); int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char* sql);
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
......
...@@ -387,10 +387,13 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int ...@@ -387,10 +387,13 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
char * sql = strndup(msg->msg, msg->sqlLen); char * sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql)); QW_ERR_JRET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
return TSDB_CODE_SUCCESS; _return:
QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%d", node, code);
return code;
} }
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
......
...@@ -508,7 +508,7 @@ _return: ...@@ -508,7 +508,7 @@ _return:
} }
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char* sql) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false; bool queryRsped = false;
SSubplan *plan = NULL; SSubplan *plan = NULL;
...@@ -536,6 +536,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { ...@@ -536,6 +536,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
} }
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
sql = NULL;
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code); QW_ERR_JRET(code);
...@@ -561,6 +562,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { ...@@ -561,6 +562,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
_return: _return:
taosMemoryFree(sql);
input.code = code; input.code = code;
input.msgType = qwMsg->msgType; input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
......
...@@ -64,7 +64,7 @@ _return: ...@@ -64,7 +64,7 @@ _return:
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) { int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) {
SSchJob *pJob = schAcquireJob(jobId); SSchJob *pJob = schAcquireJob(jobId);
if (NULL == pJob) { if (NULL == pJob) {
qError("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId); qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
......
...@@ -284,7 +284,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -284,7 +284,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
for (int32_t i = 0; i < parentNum; ++i) { for (int32_t i = 0; i < parentNum; ++i) {
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i); SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
SCH_LOCK(SCH_WRITE, &parent->planLock); SCH_LOCK(SCH_WRITE, &parent->planLock);
SDownstreamSourceNode source = { SDownstreamSourceNode source = {
...@@ -298,6 +297,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -298,6 +297,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &parent->planLock); SCH_UNLOCK(SCH_WRITE, &parent->planLock);
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) { if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId); SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
SCH_ERR_RET(schLaunchTask(pJob, parent)); SCH_ERR_RET(schLaunchTask(pJob, parent));
...@@ -536,6 +537,7 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { ...@@ -536,6 +537,7 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) { if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
pTask->maxExecTimes++; pTask->maxExecTimes++;
pTask->maxRetryTimes++;
if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) { if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
pTask->timeoutUsec *= 2; pTask->timeoutUsec *= 2;
if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) { if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
......
...@@ -150,7 +150,7 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) { ...@@ -150,7 +150,7 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
SSchJob *pJob = schAcquireJob(*jobId); SSchJob *pJob = schAcquireJob(*jobId);
if (NULL == pJob) { if (NULL == pJob) {
qError("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
return; return;
} }
......
...@@ -95,8 +95,9 @@ typedef void* queue[2]; ...@@ -95,8 +95,9 @@ typedef void* queue[2];
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // ms retry interval #define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout #define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
typedef SRpcMsg STransMsg; typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx; typedef SRpcCtx STransCtx;
......
...@@ -53,6 +53,7 @@ typedef struct { ...@@ -53,6 +53,7 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType); bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType);
int index; int index;
void* parent; void* parent;
......
...@@ -48,6 +48,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -48,6 +48,7 @@ void* rpcOpen(const SRpcInit* pInit) {
// register callback handle // register callback handle
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp; pRpc->retry = pInit->rfp;
pRpc->startTimer = pInit->tfp;
if (pInit->connType == TAOS_CONN_SERVER) { if (pInit->connType == TAOS_CONN_SERVER) {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
......
...@@ -24,6 +24,7 @@ typedef struct SCliConn { ...@@ -24,6 +24,7 @@ typedef struct SCliConn {
uv_connect_t connReq; uv_connect_t connReq;
uv_stream_t* stream; uv_stream_t* stream;
queue wreqQueue; queue wreqQueue;
uv_timer_t* timer;
void* hostThrd; void* hostThrd;
...@@ -65,12 +66,13 @@ typedef struct SCliThrd { ...@@ -65,12 +66,13 @@ typedef struct SCliThrd {
int64_t pid; // pid int64_t pid; // pid
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
uv_idle_t* idle;
uv_prepare_t* prepare; uv_prepare_t* prepare;
uv_timer_t timer;
void* pool; // conn pool void* pool; // conn pool
SArray* timerList;
// msg queue // msg queue
queue msg; queue msg;
TdThreadMutex msgMtx; TdThreadMutex msgMtx;
SDelayQueue* delayQueue; SDelayQueue* delayQueue;
...@@ -108,6 +110,8 @@ static int sockDebugInfo(struct sockaddr* sockname, char* dst) { ...@@ -108,6 +110,8 @@ static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r; return r;
} }
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
// static void cliTimeoutCb(uv_timer_t* handle); // static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv // alloc buf for recv
...@@ -330,6 +334,16 @@ void cliHandleResp(SCliConn* conn) { ...@@ -330,6 +334,16 @@ void cliHandleResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
if (conn->timer) {
if (uv_is_active((uv_handle_t*)conn->timer)) {
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
uv_timer_stop(conn->timer);
}
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
}
STransMsgHead* pHead = NULL; STransMsgHead* pHead = NULL;
transDumpFromBuffer(&conn->readBuf, (char**)&pHead); transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
...@@ -409,7 +423,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -409,7 +423,7 @@ void cliHandleResp(SCliConn* conn) {
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
} }
void cliHandleExcept(SCliConn* pConn) { void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
if (transQueueEmpty(&pConn->cliMsgs)) { if (transQueueEmpty(&pConn->cliMsgs)) {
if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
...@@ -428,7 +442,7 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -428,7 +442,7 @@ void cliHandleExcept(SCliConn* pConn) {
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
STransMsg transMsg = {0}; STransMsg transMsg = {0};
transMsg.code = pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.info.ahandle = NULL; transMsg.info.ahandle = NULL;
...@@ -459,31 +473,17 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -459,31 +473,17 @@ void cliHandleExcept(SCliConn* pConn) {
} while (!transQueueEmpty(&pConn->cliMsgs)); } while (!transQueueEmpty(&pConn->cliMsgs));
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} }
void cliHandleExcept(SCliConn* conn) {
tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
cliHandleExceptImpl(conn, -1);
}
// void cliTimeoutCb(uv_timer_t* handle) { void cliReadTimeoutCb(uv_timer_t* handle) {
// SCliThrd* pThrd = handle->data; // set up timeout cb
// STrans* pTransInst = pThrd->pTransInst; SCliConn* conn = handle->data;
// int64_t currentTime = pThrd->nextTimeout; tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
// tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label); cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
// }
// SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
// while (p != NULL) {
// while (!QUEUE_IS_EMPTY(&p->conn)) {
// queue* h = QUEUE_HEAD(&p->conn);
// SCliConn* c = QUEUE_DATA(h, SCliConn, q);
// if (c->expireTime < currentTime) {
// QUEUE_REMOVE(h);
// transUnrefCliHandle(c);
// } else {
// break;
// }
// }
// p = taosHashIterate((SHashObj*)pThrd->pool, p);
// }
//
// pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
// uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
// }
void* createConnPool(int size) { void* createConnPool(int size) {
// thread local, no lock // thread local, no lock
...@@ -654,13 +654,23 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -654,13 +654,23 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
return conn; return conn;
} }
static void cliDestroyConn(SCliConn* conn, bool clear) { static void cliDestroyConn(SCliConn* conn, bool clear) {
SCliThrd* pThrd = conn->hostThrd;
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->q); QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1; conn->refId = -1;
if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
if (conn->task != NULL) {
transDQCancel(pThrd->timeoutQueue, conn->task);
conn->task = NULL;
}
if (conn->timer != NULL) {
uv_timer_stop(conn->timer);
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
if (clear) { if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) { if (!uv_is_closing((uv_handle_t*)conn->stream)) {
...@@ -673,8 +683,15 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -673,8 +683,15 @@ static void cliDestroy(uv_handle_t* handle) {
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) { if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
return; return;
} }
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SCliThrd* pThrd = conn->hostThrd;
if (conn->timer != NULL) {
uv_timer_stop(conn->timer);
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
taosMemoryFree(conn->ip); taosMemoryFree(conn->ip);
conn->stream->data = NULL; conn->stream->data = NULL;
...@@ -764,6 +781,19 @@ void cliSend(SCliConn* pConn) { ...@@ -764,6 +781,19 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
} }
if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) {
tDebug("no avaiable timer, create");
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, timer);
}
timer->data = pConn;
pConn->timer = timer;
tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
}
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return; return;
...@@ -781,8 +811,8 @@ void cliConnCb(uv_connect_t* req, int status) { ...@@ -781,8 +811,8 @@ void cliConnCb(uv_connect_t* req, int status) {
} }
// int addrlen = sizeof(pConn->addr); // int addrlen = sizeof(pConn->addr);
struct sockaddr peername, sockname; struct sockaddr peername, sockname;
int addrlen = sizeof(peername);
int addrlen = sizeof(peername);
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
transGetSockDebugInfo(&peername, pConn->dst); transGetSockDebugInfo(&peername, pConn->dst);
...@@ -806,7 +836,6 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -806,7 +836,6 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
tDebug("cli work thread %p start to quit", pThrd); tDebug("cli work thread %p start to quit", pThrd);
destroyCmsg(pMsg); destroyCmsg(pMsg);
destroyConnPool(pThrd->pool); destroyConnPool(pThrd->pool);
uv_timer_stop(&pThrd->timer);
uv_walk(pThrd->loop, cliWalkCb, NULL); uv_walk(pThrd->loop, cliWalkCb, NULL);
} }
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
...@@ -879,8 +908,8 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { ...@@ -879,8 +908,8 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
} }
} }
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx;
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pCtx->epSet)) { if (!EPSET_IS_VALID(&pCtx->epSet)) {
...@@ -966,36 +995,6 @@ static void cliAsyncCb(uv_async_t* handle) { ...@@ -966,36 +995,6 @@ static void cliAsyncCb(uv_async_t* handle) {
} }
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
} }
static void cliIdleCb(uv_idle_t* handle) {
SCliThrd* thrd = handle->data;
tTrace("do idle work");
SAsyncPool* pool = thrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg == NULL) {
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
count++;
}
}
tTrace("prepare work end");
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
}
static void cliPrepareCb(uv_prepare_t* handle) { static void cliPrepareCb(uv_prepare_t* handle) {
SCliThrd* thrd = handle->data; SCliThrd* thrd = handle->data;
tTrace("prepare work start"); tTrace("prepare work start");
...@@ -1085,19 +1084,20 @@ static SCliThrd* createThrdObj() { ...@@ -1085,19 +1084,20 @@ static SCliThrd* createThrdObj() {
uv_loop_init(pThrd->loop); uv_loop_init(pThrd->loop);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb);
uv_timer_init(pThrd->loop, &pThrd->timer);
pThrd->timer.data = pThrd;
// pThrd->idle = taosMemoryCalloc(1, sizeof(uv_idle_t));
// uv_idle_init(pThrd->loop, pThrd->idle);
// pThrd->idle->data = pThrd;
// uv_idle_start(pThrd->idle, cliIdleCb);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare); uv_prepare_init(pThrd->loop, pThrd->prepare);
pThrd->prepare->data = pThrd; pThrd->prepare->data = pThrd;
uv_prepare_start(pThrd->prepare, cliPrepareCb); uv_prepare_start(pThrd->prepare, cliPrepareCb);
int32_t timerSize = 512;
pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
for (int i = 0; i < timerSize; i++) {
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, timer);
taosArrayPush(pThrd->timerList, &timer);
}
pThrd->pool = createConnPool(4); pThrd->pool = createConnPool(4);
transDQCreate(pThrd->loop, &pThrd->delayQueue); transDQCreate(pThrd->loop, &pThrd->delayQueue);
...@@ -1120,7 +1120,12 @@ static void destroyThrdObj(SCliThrd* pThrd) { ...@@ -1120,7 +1120,12 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy(pThrd->delayQueue, destroyCmsg); transDQDestroy(pThrd->delayQueue, destroyCmsg);
transDQDestroy(pThrd->timeoutQueue, NULL); transDQDestroy(pThrd->timeoutQueue, NULL);
taosMemoryFree(pThrd->idle); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
taosMemoryFree(timer);
}
taosArrayDestroy(pThrd->timerList);
taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
......
...@@ -333,7 +333,7 @@ int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) { ...@@ -333,7 +333,7 @@ int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
return -1; return -1;
} }
int32_t nleft, nwritten; int32_t nleft, nwritten;
char * ptr = (char *)buf; char *ptr = (char *)buf;
nleft = nbytes; nleft = nbytes;
...@@ -362,7 +362,7 @@ int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) { ...@@ -362,7 +362,7 @@ int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
return -1; return -1;
} }
int32_t nleft, nread; int32_t nleft, nread;
char * ptr = (char *)buf; char *ptr = (char *)buf;
nleft = nbytes; nleft = nbytes;
...@@ -912,7 +912,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) { ...@@ -912,7 +912,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result); int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
if (result) { if (result) {
struct sockaddr * sa = result->ai_addr; struct sockaddr *sa = result->ai_addr;
struct sockaddr_in *si = (struct sockaddr_in *)sa; struct sockaddr_in *si = (struct sockaddr_in *)sa;
struct in_addr ia = si->sin_addr; struct in_addr ia = si->sin_addr;
uint32_t ip = ia.s_addr; uint32_t ip = ia.s_addr;
......
...@@ -52,6 +52,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish c ...@@ -52,6 +52,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish c
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")
......
...@@ -49,11 +49,13 @@ if [ $ent -eq 0 ]; then ...@@ -49,11 +49,13 @@ if [ $ent -eq 0 ]; then
export PATH=$PATH:/home/TDengine/debug/build/bin export PATH=$PATH:/home/TDengine/debug/build/bin
export LD_LIBRARY_PATH=/home/TDengine/debug/build/lib export LD_LIBRARY_PATH=/home/TDengine/debug/build/lib
ln -s /home/TDengine/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null ln -s /home/TDengine/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null
ln -s /home/TDengine/debug/build/lib/libtaos.so /usr/lib/libtaos.so.1 2>/dev/null
CONTAINER_TESTDIR=/home/TDengine CONTAINER_TESTDIR=/home/TDengine
else else
export PATH=$PATH:/home/TDinternal/debug/build/bin export PATH=$PATH:/home/TDinternal/debug/build/bin
export LD_LIBRARY_PATH=/home/TDinternal/debug/build/lib export LD_LIBRARY_PATH=/home/TDinternal/debug/build/lib
ln -s /home/TDinternal/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null ln -s /home/TDinternal/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null
ln -s /home/TDinternal/debug/build/lib/libtaos.so /usr/lib/libtaos.so.1 2>/dev/null
CONTAINER_TESTDIR=/home/TDinternal/community CONTAINER_TESTDIR=/home/TDinternal/community
fi fi
mkdir -p /var/lib/taos/subscribe mkdir -p /var/lib/taos/subscribe
......
...@@ -100,6 +100,7 @@ docker run \ ...@@ -100,6 +100,7 @@ docker run \
-v "$TMP_DIR/thread_volume/$thread_no/sim:${SIM_DIR}" \ -v "$TMP_DIR/thread_volume/$thread_no/sim:${SIM_DIR}" \
-v ${TMP_DIR}/thread_volume/$thread_no/coredump:$coredump_dir \ -v ${TMP_DIR}/thread_volume/$thread_no/coredump:$coredump_dir \
-v $WORKDIR/taos-connector-python/taos:/usr/local/lib/python3.8/site-packages/taos:ro \ -v $WORKDIR/taos-connector-python/taos:/usr/local/lib/python3.8/site-packages/taos:ro \
-v $WORKDIR/taos-connector-python/taosrest:/usr/local/lib/python3.8/site-packages/taosrest:ro \
--rm --ulimit core=-1 taos_test:v1.0 $CONTAINER_TESTDIR/tests/parallel_test/run_case.sh -d "$exec_dir" -c "$cmd" $extra_param --rm --ulimit core=-1 taos_test:v1.0 $CONTAINER_TESTDIR/tests/parallel_test/run_case.sh -d "$exec_dir" -c "$cmd" $extra_param
ret=$? ret=$?
exit $ret exit $ret
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册