提交 b8391fad 编写于 作者: D dapan1121

fix heartbeat msg

上级 6f12bc10
...@@ -341,12 +341,12 @@ int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); ...@@ -341,12 +341,12 @@ int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
typedef struct { typedef struct {
int32_t acctId; int32_t acctId;
int64_t clusterId; int64_t clusterId;
int32_t connId; uint32_t connId;
int8_t superUser; int8_t superUser;
SEpSet epSet; SEpSet epSet;
char sVersion[128]; char sVersion[128];
} SConnectRsp; } SConnectRsp;
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
...@@ -1038,40 +1038,6 @@ typedef struct { ...@@ -1038,40 +1038,6 @@ typedef struct {
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
int32_t queryId;
int64_t useconds;
int64_t stime;
int64_t qId;
int64_t sqlObjId;
int32_t pid;
char fqdn[TSDB_FQDN_LEN];
int8_t stableQuery;
int32_t numOfSub;
char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete)
} SQueryDesc;
typedef struct {
int32_t connId;
int32_t pid;
int32_t numOfQueries;
int32_t numOfStreams;
char app[TSDB_APP_NAME_LEN];
char pData[];
} SHeartBeatReq;
typedef struct {
int32_t connId;
int32_t queryId;
int32_t streamId;
int32_t totalDnodes;
int32_t onlineDnodes;
int8_t killConnection;
int8_t align[3];
SEpSet epSet;
} SHeartBeatRsp;
typedef struct { typedef struct {
int32_t connId; int32_t connId;
int32_t queryId; int32_t queryId;
...@@ -1674,13 +1640,48 @@ typedef struct { ...@@ -1674,13 +1640,48 @@ typedef struct {
} SKv; } SKv;
typedef struct { typedef struct {
int32_t connId; int64_t tscRid;
int32_t hbType; int32_t hbType;
} SClientHbKey; } SClientHbKey;
typedef struct { typedef struct {
SClientHbKey connKey; int64_t tid;
SHashObj* info; // hash<Skv.key, Skv> int32_t status;
} SQuerySubDesc;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
uint64_t queryId;
int64_t useconds;
int64_t stime;
int64_t reqRid;
int32_t pid;
char fqdn[TSDB_FQDN_LEN];
int32_t subPlanNum;
SArray* subDesc; // SArray<SQuerySubDesc>
} SQueryDesc;
typedef struct {
uint32_t connId;
int32_t pid;
char app[TSDB_APP_NAME_LEN];
SArray* queryDesc; // SArray<SQueryDesc>
} SQueryHbReqBasic;
typedef struct {
uint32_t connId;
uint64_t killRid;
int32_t totalDnodes;
int32_t onlineDnodes;
int8_t killConnection;
int8_t align[3];
SEpSet epSet;
} SQueryHbRspBasic;
typedef struct {
SClientHbKey connKey;
SQueryHbReqBasic* query;
SHashObj* info; // hash<Skv.key, Skv>
} SClientHbReq; } SClientHbReq;
typedef struct { typedef struct {
...@@ -1689,9 +1690,10 @@ typedef struct { ...@@ -1689,9 +1690,10 @@ typedef struct {
} SClientHbBatchReq; } SClientHbBatchReq;
typedef struct { typedef struct {
SClientHbKey connKey; SClientHbKey connKey;
int32_t status; int32_t status;
SArray* info; // Array<Skv> SQueryHbRspBasic* query;
SArray* info; // Array<Skv>
} SClientHbRsp; } SClientHbRsp;
typedef struct { typedef struct {
...@@ -1711,8 +1713,23 @@ static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { ...@@ -1711,8 +1713,23 @@ static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) {
} }
} }
static FORCE_INLINE void tFreeClientHbQueryDesc(void* pDesc) {
SQueryDesc* desc = (SQueryDesc*)pDesc;
if (desc->subDesc) {
taosArrayDestroy(desc->subDesc);
desc->subDesc = NULL;
}
}
static FORCE_INLINE void tFreeClientHbReq(void* pReq) { static FORCE_INLINE void tFreeClientHbReq(void* pReq) {
SClientHbReq* req = (SClientHbReq*)pReq; SClientHbReq* req = (SClientHbReq*)pReq;
if (req->query) {
if (req->query->queryDesc) {
taosArrayDestroyEx(req->query->queryDesc, tFreeClientHbQueryDesc);
}
taosMemoryFreeClear(req->query);
}
if (req->info) { if (req->info) {
tFreeReqKvHash(req->info); tFreeReqKvHash(req->info);
taosHashCleanup(req->info); taosHashCleanup(req->info);
...@@ -1741,6 +1758,7 @@ static FORCE_INLINE void tFreeClientKv(void* pKv) { ...@@ -1741,6 +1758,7 @@ static FORCE_INLINE void tFreeClientKv(void* pKv) {
static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) { static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) {
SClientHbRsp* rsp = (SClientHbRsp*)pRsp; SClientHbRsp* rsp = (SClientHbRsp*)pRsp;
taosMemoryFreeClear(rsp->query);
if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv);
} }
...@@ -1769,13 +1787,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) { ...@@ -1769,13 +1787,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
} }
static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) { static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) {
if (tEncodeI32(pEncoder, pKey->connId) < 0) return -1; if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1;
if (tEncodeI32(pEncoder, pKey->hbType) < 0) return -1; if (tEncodeI32(pEncoder, pKey->hbType) < 0) return -1;
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) { static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) {
if (tDecodeI32(pDecoder, &pKey->connId) < 0) return -1; if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1;
if (tDecodeI32(pDecoder, &pKey->hbType) < 0) return -1; if (tDecodeI32(pDecoder, &pKey->hbType) < 0) return -1;
return 0; return 0;
} }
......
...@@ -89,6 +89,8 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD ...@@ -89,6 +89,8 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD
*/ */
int32_t schedulerFetchRows(int64_t job, void **data); int32_t schedulerFetchRows(int64_t job, void **data);
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);
/** /**
* Cancel query job * Cancel query job
......
...@@ -205,6 +205,14 @@ SArray* taosArrayDup(const SArray* pSrc); ...@@ -205,6 +205,14 @@ SArray* taosArrayDup(const SArray* pSrc);
*/ */
void taosArrayClear(SArray* pArray); void taosArrayClear(SArray* pArray);
/**
* clear the array (remove all element)
* @param pArray
* @param fp
*/
void taosArrayClearEx(SArray* pArray, void (*fp)(void*));
/** /**
* destroy array list * destroy array list
* @param pArray * @param pArray
......
...@@ -43,7 +43,8 @@ extern "C" { ...@@ -43,7 +43,8 @@ extern "C" {
} \ } \
} while (0) } while (0)
#define HEARTBEAT_INTERVAL 1500 // ms //#define HEARTBEAT_INTERVAL 1500 // ms
#define HEARTBEAT_INTERVAL 15000 // ms TODO
typedef struct SAppInstInfo SAppInstInfo; typedef struct SAppInstInfo SAppInstInfo;
...@@ -139,6 +140,7 @@ typedef struct STscObj { ...@@ -139,6 +140,7 @@ typedef struct STscObj {
TdThreadMutex mutex; // used to protect the operation on db TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo; SAppInstInfo* pAppInfo;
SHashObj* pRequests;
} STscObj; } STscObj;
typedef struct SResultColumn { typedef struct SResultColumn {
...@@ -215,11 +217,15 @@ int taos_init(); ...@@ -215,11 +217,15 @@ int taos_init();
void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo);
void destroyTscObj(void* pObj); void destroyTscObj(void* pObj);
STscObj *acquireTscObj(int64_t rid);
int32_t releaseTscObj(int64_t rid);
uint64_t generateRequestId(); uint64_t generateRequestId();
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest); void destroyRequest(SRequestObj* pRequest);
SRequestObj *acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
char* getDbOfConnection(STscObj* pObj); char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db); void setConnectionDB(STscObj* pTscObj, const char* db);
...@@ -258,7 +264,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); ...@@ -258,7 +264,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
void appHbMgrCleanup(void); void appHbMgrCleanup(void);
// conn level // conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType); int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int32_t hbType);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
......
...@@ -37,7 +37,8 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; ...@@ -37,7 +37,8 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0; volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj *pRequest) { static void registerRequest(SRequestObj *pRequest) {
STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id); STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
assert(pTscObj != NULL); assert(pTscObj != NULL);
// connection has been released already, abort creating request. // connection has been released already, abort creating request.
...@@ -69,7 +70,7 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -69,7 +70,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
" ms, current:%d, app current:%d", " ms, current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration/1000, num, currentInst); pRequest->self, pTscObj->id, pRequest->requestId, duration/1000, num, currentInst);
taosReleaseRef(clientConnRefPool, pTscObj->id); releaseTscObj(pTscObj->id);
} }
// todo close the transporter properly // todo close the transporter properly
...@@ -107,12 +108,24 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { ...@@ -107,12 +108,24 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
return pDnodeConn; return pDnodeConn;
} }
void closeAllRequests(SHashObj *pRequests) {
void *pIter = taosHashIterate(pRequests, NULL);
while (pIter != NULL) {
int64_t *rid = pIter;
releaseRequest(*rid);
pIter = taosHashIterate(pRequests, pIter);
}
}
void destroyTscObj(void *pObj) { void destroyTscObj(void *pObj) {
STscObj *pTscObj = pObj; STscObj *pTscObj = pObj;
SClientHbKey connKey = {.connId = pTscObj->connId, .hbType = pTscObj->connType}; SClientHbKey connKey = {.tscRid = pTscObj->id, .hbType = pTscObj->connType};
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
closeAllRequests(pTscObj->pRequests);
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
taosThreadMutexDestroy(&pTscObj->mutex); taosThreadMutexDestroy(&pTscObj->mutex);
taosMemoryFreeClear(pTscObj); taosMemoryFreeClear(pTscObj);
...@@ -125,6 +138,13 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI ...@@ -125,6 +138,13 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
return NULL; return NULL;
} }
pObj->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pObj->pRequests) {
taosMemoryFree(pObj);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
pObj->pAppInfo = pAppInfo; pObj->pAppInfo = pAppInfo;
tstrncpy(pObj->user, user, sizeof(pObj->user)); tstrncpy(pObj->user, user, sizeof(pObj->user));
memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
...@@ -140,6 +160,14 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI ...@@ -140,6 +160,14 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
return pObj; return pObj;
} }
STscObj *acquireTscObj(int64_t rid) {
return (STscObj *)taosAcquireRef(clientConnRefPool, rid);
}
int32_t releaseTscObj(int64_t rid) {
return taosReleaseRef(clientConnRefPool, rid);
}
void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) { void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) {
assert(pObj != NULL); assert(pObj != NULL);
...@@ -160,6 +188,14 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty ...@@ -160,6 +188,14 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
tsem_init(&pRequest->body.rspSem, 0, 0); tsem_init(&pRequest->body.rspSem, 0, 0);
registerRequest(pRequest); registerRequest(pRequest);
if (taosHashPut(pObj->pRequests, &pRequest->self, sizeof(pRequest->self), &pRequest->self, sizeof(pRequest->self))) {
destroyRequest(pRequest);
releaseTscObj(pObj->id);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
return pRequest; return pRequest;
} }
...@@ -185,6 +221,8 @@ static void doDestroyRequest(void *p) { ...@@ -185,6 +221,8 @@ static void doDestroyRequest(void *p) {
assert(RID_VALID(pRequest->self)); assert(RID_VALID(pRequest->self));
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFreeClear(pRequest->pInfo); taosMemoryFreeClear(pRequest->pInfo);
...@@ -213,9 +251,18 @@ void destroyRequest(SRequestObj *pRequest) { ...@@ -213,9 +251,18 @@ void destroyRequest(SRequestObj *pRequest) {
return; return;
} }
taosReleaseRef(clientReqRefPool, pRequest->self); taosRemoveRef(clientReqRefPool, pRequest->self);
} }
SRequestObj *acquireRequest(int64_t rid) {
return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid);
}
int32_t releaseRequest(int64_t rid) {
return taosReleaseRef(clientReqRefPool, rid);
}
void taos_init_imp(void) { void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet. // In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
...@@ -456,11 +503,18 @@ uint64_t generateRequestId() { ...@@ -456,11 +503,18 @@ uint64_t generateRequestId() {
} }
} }
int64_t ts = taosGetTimestampMs(); uint64_t id = 0;
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1); while (true) {
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
if (id) {
break;
}
}
return id; return id;
} }
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "catalog.h" #include "catalog.h"
#include "scheduler.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "trpc.h" #include "trpc.h"
...@@ -107,10 +108,36 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo ...@@ -107,10 +108,36 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) { if (NULL == info) {
tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid, pRsp->connKey.hbType);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pRsp->query) {
STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
if (NULL == pTscObj) {
tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
} else {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
pTscObj->connId = pRsp->query->connId;
if (pRsp->query->killRid) {
SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
if (NULL == pRequest) {
tscDebug("request 0x%" PRIx64 " not exist to kill", pRsp->query->killRid);
} else {
taos_stop_query((TAOS_RES *)pRequest);
releaseRequest(pRsp->query->killRid);
}
}
if (pRsp->query->killConnection) {
taos_close(pTscObj);
}
releaseTscObj(pRsp->connKey.tscRid);
}
}
int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
tscDebug("hb got %d rsp kv", kvNum); tscDebug("hb got %d rsp kv", kvNum);
...@@ -206,6 +233,97 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) ...@@ -206,6 +233,97 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code)
return code; return code;
} }
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
int64_t now = taosGetTimestampUs();
SQueryDesc desc = {0};
int32_t code = 0;
void *pIter = taosHashIterate(pObj->pRequests, NULL);
while (pIter != NULL) {
int64_t *rid = pIter;
SRequestObj *pRequest = acquireRequest(*rid);
if (NULL == pRequest) {
continue;
}
tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
desc.stime = pRequest->metric.start;
desc.queryId = pRequest->requestId;
desc.useconds = now - pRequest->metric.start;
desc.reqRid = pRequest->self;
desc.pid = hbBasic->pid;
taosGetFqdn(desc.fqdn);
desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0;
if (desc.subPlanNum) {
desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
if (NULL == desc.subDesc) {
releaseRequest(*rid);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
if (code) {
taosArrayDestroy(desc.subDesc);
desc.subDesc = NULL;
}
}
releaseRequest(*rid);
taosArrayPush(hbBasic->queryDesc, &desc);
pIter = taosHashIterate(pObj->pRequests, pIter);
}
return TSDB_CODE_SUCCESS;
}
int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (NULL == pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_QRY_APP_ERROR;
}
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
if (numOfQueries <= 0) {
releaseTscObj(connKey->tscRid);
tscDebug("no queries on connection");
return TSDB_CODE_QRY_APP_ERROR;
}
SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
if (NULL == hbBasic) {
tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
releaseTscObj(connKey->tscRid);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
if (NULL == hbBasic->queryDesc) {
tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
releaseTscObj(connKey->tscRid);
taosMemoryFree(hbBasic);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
hbBasic->connId = pTscObj->connId;
hbBasic->pid = taosGetPId();
taosGetAppName(hbBasic->app, NULL);
int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
if (code) {
releaseTscObj(connKey->tscRid);
taosMemoryFree(hbBasic);
return code;
}
req->query = hbBasic;
releaseTscObj(connKey->tscRid);
return TSDB_CODE_SUCCESS;
}
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SDbVgVersion *dbs = NULL; SDbVgVersion *dbs = NULL;
uint32_t dbNum = 0; uint32_t dbNum = 0;
...@@ -284,6 +402,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req ...@@ -284,6 +402,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
return code; return code;
} }
hbGetQueryBasicInfo(connKey, req);
code = hbGetExpiredDBInfo(connKey, pCatalog, req); code = hbGetExpiredDBInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
...@@ -316,6 +436,11 @@ void hbFreeReq(void *req) { ...@@ -316,6 +436,11 @@ void hbFreeReq(void *req) {
tFreeReqKvHash(pReq->info); tFreeReqKvHash(pReq->info);
} }
void hbClearClientHbReq(SClientHbReq *pReq) {
pReq->query = NULL;
pReq->info = NULL;
}
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq)); SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
if (pBatchReq == NULL) { if (pBatchReq == NULL) {
...@@ -334,20 +459,21 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -334,20 +459,21 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
if (info) { if (info) {
code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq); code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq);
if (code) { if (code) {
taosHashCancelIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
break; continue;
} }
} }
taosArrayPush(pBatchReq->reqs, pOneReq); taosArrayPush(pBatchReq->reqs, pOneReq);
hbClearClientHbReq(pOneReq);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
} }
if (code) { // if (code) {
taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq); // taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
taosMemoryFreeClear(pBatchReq); // taosMemoryFreeClear(pBatchReq);
} // }
return pBatchReq; return pBatchReq;
} }
...@@ -548,7 +674,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * ...@@ -548,7 +674,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
if (data != NULL) { if (data != NULL) {
return 0; return 0;
} }
SClientHbReq hbReq; SClientHbReq hbReq = {0};
hbReq.connKey = connKey; hbReq.connKey = connKey;
hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
...@@ -565,9 +691,9 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * ...@@ -565,9 +691,9 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
return 0; return 0;
} }
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int32_t hbType) {
SClientHbKey connKey = { SClientHbKey connKey = {
.connId = connId, .tscRid = tscRefId,
.hbType = HEARTBEAT_TYPE_QUERY, .hbType = HEARTBEAT_TYPE_QUERY,
}; };
SHbConnInfo info = {0}; SHbConnInfo info = {0};
......
...@@ -458,7 +458,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -458,7 +458,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
taos_close(pTscObj); taos_close(pTscObj);
pTscObj = NULL; pTscObj = NULL;
} else { } else {
tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
destroyRequest(pRequest); destroyRequest(pRequest);
} }
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "catalog.h" #include "catalog.h"
#include "scheduler.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "os.h" #include "os.h"
...@@ -66,6 +67,7 @@ void taos_cleanup(void) { ...@@ -66,6 +67,7 @@ void taos_cleanup(void) {
rpcCleanup(); rpcCleanup();
catalogDestroy(); catalogDestroy();
schedulerDestroy();
taosCloseLog(); taosCloseLog();
tscInfo("all local resources released"); tscInfo("all local resources released");
...@@ -98,7 +100,7 @@ void taos_close(TAOS *taos) { ...@@ -98,7 +100,7 @@ void taos_close(TAOS *taos) {
STscObj *pTscObj = (STscObj *)taos; STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
/*taosRemoveRef(clientConnRefPool, pTscObj->id);*/ taosRemoveRef(clientConnRefPool, pTscObj->id);
} }
int taos_errno(TAOS_RES *tres) { int taos_errno(TAOS_RES *tres) {
...@@ -366,7 +368,7 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -366,7 +368,7 @@ void taos_stop_query(TAOS_RES *res) {
return; return;
} }
// scheduleCancelJob(pRequest->body.pQueryJob); schedulerFreeJob(pRequest->body.queryJob);
} }
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
......
...@@ -71,7 +71,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -71,7 +71,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->connType = HEARTBEAT_TYPE_QUERY; pTscObj->connType = HEARTBEAT_TYPE_QUERY;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY); hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY);
// pRequest->body.resInfo.pRspMsg = pMsg->pData; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
......
...@@ -134,6 +134,42 @@ void *taosDecodeSEpSet(void *buf, SEpSet *pEp) { ...@@ -134,6 +134,42 @@ void *taosDecodeSEpSet(void *buf, SEpSet *pEp) {
static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) { static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) {
if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1; if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1;
if (pReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
int32_t queryNum = 0;
if (pReq->query) {
queryNum = 1;
if (tEncodeI32(pEncoder, queryNum) < 0) return -1;
if (tEncodeU32(pEncoder, pReq->query->connId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->query->pid) < 0) return -1;
if (tEncodeCStr(pEncoder, pReq->query->app) < 0) return -1;
int32_t num = taosArrayGetSize(pReq->query->queryDesc);
if (tEncodeI32(pEncoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SQueryDesc *desc = taosArrayGet(pReq->query->queryDesc, i);
if (tEncodeCStr(pEncoder, desc->sql) < 0) return -1;
if (tEncodeU64(pEncoder, desc->queryId) < 0) return -1;
if (tEncodeI64(pEncoder, desc->useconds) < 0) return -1;
if (tEncodeI64(pEncoder, desc->stime) < 0) return -1;
if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1;
if (tEncodeI32(pEncoder, desc->pid) < 0) return -1;
if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1;
if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1;
int32_t snum = desc->subDesc ? taosArrayGetSize(desc->subDesc) : 0;
if (tEncodeI32(pEncoder, snum) < 0) return -1;
for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m);
if (tEncodeI64(pEncoder, sDesc->tid) < 0) return -1;
if (tEncodeI32(pEncoder, sDesc->status) < 0) return -1;
}
}
} else {
if (tEncodeI32(pEncoder, queryNum) < 0) return -1;
}
}
int32_t kvNum = taosHashGetSize(pReq->info); int32_t kvNum = taosHashGetSize(pReq->info);
if (tEncodeI32(pEncoder, kvNum) < 0) return -1; if (tEncodeI32(pEncoder, kvNum) < 0) return -1;
void *pIter = taosHashIterate(pReq->info, NULL); void *pIter = taosHashIterate(pReq->info, NULL);
...@@ -149,6 +185,53 @@ static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq ...@@ -149,6 +185,53 @@ static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq
static int32_t tDeserializeSClientHbReq(SCoder *pDecoder, SClientHbReq *pReq) { static int32_t tDeserializeSClientHbReq(SCoder *pDecoder, SClientHbReq *pReq) {
if (tDecodeSClientHbKey(pDecoder, &pReq->connKey) < 0) return -1; if (tDecodeSClientHbKey(pDecoder, &pReq->connKey) < 0) return -1;
if (pReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
int32_t queryNum = 0;
if (tDecodeI32(pDecoder, &queryNum) < 0) return -1;
if (queryNum) {
pReq->query = taosMemoryCalloc(1, sizeof(*pReq->query));
if (NULL == pReq->query) return -1;
if (tDecodeU32(pDecoder, &pReq->query->connId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->query->pid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pReq->query->app) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(pDecoder, &num) < 0) return -1;
if (num > 0) {
pReq->query->queryDesc = taosArrayInit(num, sizeof(SQueryDesc));
if (NULL == pReq->query->queryDesc) return -1;
for (int32_t i = 0; i < num; ++i) {
SQueryDesc desc = {0};
if (tDecodeCStrTo(pDecoder, desc.sql) < 0) return -1;
if (tDecodeU64(pDecoder, &desc.queryId) < 0) return -1;
if (tDecodeI64(pDecoder, &desc.useconds) < 0) return -1;
if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1;
if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1;
if (tDecodeI32(pDecoder, &desc.pid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1;
if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1;
int32_t snum = 0;
if (tDecodeI32(pDecoder, &snum) < 0) return -1;
if (snum > 0) {
desc.subDesc = taosArrayInit(snum, sizeof(SQuerySubDesc));
if (NULL == desc.subDesc) return -1;
for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc sDesc = {0};
if (tDecodeI64(pDecoder, &sDesc.tid) < 0) return -1;
if (tDecodeI32(pDecoder, &sDesc.status) < 0) return -1;
taosArrayPush(desc.subDesc, &sDesc);
}
}
taosArrayPush(pReq->query->queryDesc, &desc);
}
}
}
}
int32_t kvNum = 0; int32_t kvNum = 0;
if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; if (tDecodeI32(pDecoder, &kvNum) < 0) return -1;
if (pReq->info == NULL) { if (pReq->info == NULL) {
...@@ -168,6 +251,20 @@ static int32_t tSerializeSClientHbRsp(SCoder *pEncoder, const SClientHbRsp *pRsp ...@@ -168,6 +251,20 @@ static int32_t tSerializeSClientHbRsp(SCoder *pEncoder, const SClientHbRsp *pRsp
if (tEncodeSClientHbKey(pEncoder, &pRsp->connKey) < 0) return -1; if (tEncodeSClientHbKey(pEncoder, &pRsp->connKey) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->status) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->status) < 0) return -1;
int32_t queryNum = 0;
if (pRsp->query) {
queryNum = 1;
if (tEncodeI32(pEncoder, queryNum) < 0) return -1;
if (tEncodeU32(pEncoder, pRsp->query->connId) < 0) return -1;
if (tEncodeU64(pEncoder, pRsp->query->killRid) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->query->totalDnodes) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->query->onlineDnodes) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->query->killConnection) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pRsp->query->epSet) < 0) return -1;
} else {
if (tEncodeI32(pEncoder, queryNum) < 0) return -1;
}
int32_t kvNum = taosArrayGetSize(pRsp->info); int32_t kvNum = taosArrayGetSize(pRsp->info);
if (tEncodeI32(pEncoder, kvNum) < 0) return -1; if (tEncodeI32(pEncoder, kvNum) < 0) return -1;
for (int32_t i = 0; i < kvNum; i++) { for (int32_t i = 0; i < kvNum; i++) {
...@@ -182,6 +279,19 @@ static int32_t tDeserializeSClientHbRsp(SCoder *pDecoder, SClientHbRsp *pRsp) { ...@@ -182,6 +279,19 @@ static int32_t tDeserializeSClientHbRsp(SCoder *pDecoder, SClientHbRsp *pRsp) {
if (tDecodeSClientHbKey(pDecoder, &pRsp->connKey) < 0) return -1; if (tDecodeSClientHbKey(pDecoder, &pRsp->connKey) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->status) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->status) < 0) return -1;
int32_t queryNum = 0;
if (tDecodeI32(pDecoder, &queryNum) < 0) return -1;
if (queryNum) {
pRsp->query = taosMemoryCalloc(1, sizeof(*pRsp->query));
if (NULL == pRsp->query) return -1;
if (tDecodeU32(pDecoder, &pRsp->query->connId) < 0) return -1;
if (tDecodeU64(pDecoder, &pRsp->query->killRid) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->query->totalDnodes) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->query->onlineDnodes) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->query->killConnection) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pRsp->query->epSet) < 0) return -1;
}
int32_t kvNum = 0; int32_t kvNum = 0;
if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; if (tDecodeI32(pDecoder, &kvNum) < 0) return -1;
pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); pRsp->info = taosArrayInit(kvNum, sizeof(SKv));
...@@ -224,8 +334,9 @@ int32_t tDeserializeSClientHbBatchReq(void *buf, int32_t bufLen, SClientHbBatchR ...@@ -224,8 +334,9 @@ int32_t tDeserializeSClientHbBatchReq(void *buf, int32_t bufLen, SClientHbBatchR
int32_t reqNum = 0; int32_t reqNum = 0;
if (tDecodeI32(&decoder, &reqNum) < 0) return -1; if (tDecodeI32(&decoder, &reqNum) < 0) return -1;
if (pBatchReq->reqs == NULL) { if (reqNum > 0) {
pBatchReq->reqs = taosArrayInit(reqNum, sizeof(SClientHbReq)); pBatchReq->reqs = taosArrayInit(reqNum, sizeof(SClientHbReq));
if (NULL == pBatchReq->reqs) return -1;
} }
for (int32_t i = 0; i < reqNum; i++) { for (int32_t i = 0; i < reqNum; i++) {
SClientHbReq req = {0}; SClientHbReq req = {0};
...@@ -2564,7 +2675,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { ...@@ -2564,7 +2675,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->acctId) < 0) return -1; if (tEncodeI32(&encoder, pRsp->acctId) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1; if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->connId) < 0) return -1; if (tEncodeU32(&encoder, pRsp->connId) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1; if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1;
if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1; if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1;
...@@ -2582,7 +2693,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { ...@@ -2582,7 +2693,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1; if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1; if (tDecodeU32(&decoder, &pRsp->connId) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1;
......
...@@ -75,7 +75,6 @@ typedef struct { ...@@ -75,7 +75,6 @@ typedef struct {
} SShowMgmt; } SShowMgmt;
typedef struct { typedef struct {
int32_t connId;
SCacheObj *cache; SCacheObj *cache;
} SProfileMgmt; } SProfileMgmt;
......
...@@ -1128,6 +1128,8 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { ...@@ -1128,6 +1128,8 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
if (taosArrayGetSize(usedbRsp.pVgroupInfos) <= 0) { if (taosArrayGetSize(usedbRsp.pVgroupInfos) <= 0) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
} else {
code = 0;
} }
} else { } else {
usedbRsp.vgVersion = usedbReq.vgVersion; usedbRsp.vgVersion = usedbReq.vgVersion;
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#define QUERY_SAVE_SIZE 20 #define QUERY_SAVE_SIZE 20
typedef struct { typedef struct {
int32_t id; uint32_t id;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int64_t appStartTimeMs; // app start time int64_t appStartTimeMs; // app start time
...@@ -39,15 +39,15 @@ typedef struct { ...@@ -39,15 +39,15 @@ typedef struct {
int8_t killed; int8_t killed;
int64_t loginTimeMs; int64_t loginTimeMs;
int64_t lastAccessTimeMs; int64_t lastAccessTimeMs;
int32_t queryId; uint64_t killId;
int32_t numOfQueries; int32_t numOfQueries;
SQueryDesc *pQueries; SArray *pQueries; //SArray<SQueryDesc>
} SConnObj; } SConnObj;
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid, static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid,
const char *app, int64_t startTime); const char *app, int64_t startTime);
static void mndFreeConn(SConnObj *pConn); static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter); static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
...@@ -97,8 +97,9 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui ...@@ -97,8 +97,9 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui
const char *app, int64_t startTime) { const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1); char connStr[255] = {0};
if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1); int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
int32_t connId = mndGenerateUid(connStr, len);
if (startTime == 0) startTime = taosGetTimestampMs(); if (startTime == 0) startTime = taosGetTimestampMs();
SConnObj connObj = {.id = connId, SConnObj connObj = {.id = connId,
...@@ -109,7 +110,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui ...@@ -109,7 +110,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui
.killed = 0, .killed = 0,
.loginTimeMs = taosGetTimestampMs(), .loginTimeMs = taosGetTimestampMs(),
.lastAccessTimeMs = 0, .lastAccessTimeMs = 0,
.queryId = 0, .killId = 0,
.numOfQueries = 0, .numOfQueries = 0,
.pQueries = NULL}; .pQueries = NULL};
...@@ -124,35 +125,35 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui ...@@ -124,35 +125,35 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui
mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr()); mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
return NULL; return NULL;
} else { } else {
mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, user); mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
return pConn; return pConn;
} }
} }
static void mndFreeConn(SConnObj *pConn) { static void mndFreeConn(SConnObj *pConn) {
taosMemoryFreeClear(pConn->pQueries); taosMemoryFreeClear(pConn->pQueries);
mTrace("conn:%d, is destroyed, data:%p", pConn->id, pConn); mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
} }
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) { static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(connId));
if (pConn == NULL) { if (pConn == NULL) {
mDebug("conn:%d, already destroyed", connId); mDebug("conn:%u, already destroyed", connId);
return NULL; return NULL;
} }
int32_t keepTime = tsShellActivityTimer * 3; int32_t keepTime = tsShellActivityTimer * 3;
pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs(); pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
mTrace("conn:%d, acquired from cache, data:%p", pConn->id, pConn); mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
return pConn; return pConn;
} }
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) { static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
if (pConn == NULL) return; if (pConn == NULL) return;
mTrace("conn:%d, released from cache, data:%p", pConn->id, pConn); mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false); taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
...@@ -217,6 +218,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { ...@@ -217,6 +218,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
goto CONN_OVER; goto CONN_OVER;
} }
mndAcquireConn(pMnode, pConn->id);
SConnectRsp connectRsp = {0}; SConnectRsp connectRsp = {0};
connectRsp.acctId = pUser->acctId; connectRsp.acctId = pUser->acctId;
connectRsp.superUser = pUser->superUser; connectRsp.superUser = pUser->superUser;
...@@ -236,7 +239,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { ...@@ -236,7 +239,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
pReq->rspLen = contLen; pReq->rspLen = contLen;
pReq->pRsp = pRsp; pReq->pRsp = pRsp;
mDebug("user:%s, login from %s, conn:%d, app:%s", pReq->user, ip, pConn->id, connReq.app); mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->user, ip, pConn->port, pConn->id, connReq.app);
code = 0; code = 0;
...@@ -249,22 +252,13 @@ CONN_OVER: ...@@ -249,22 +252,13 @@ CONN_OVER:
return code; return code;
} }
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
pConn->numOfQueries = 0; taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
int32_t numOfQueries = htonl(pReq->numOfQueries);
if (numOfQueries > 0) {
if (pConn->pQueries == NULL) {
pConn->pQueries = taosMemoryCalloc(sizeof(SQueryDesc), QUERY_SAVE_SIZE);
}
pConn->numOfQueries = TMIN(QUERY_SAVE_SIZE, numOfQueries);
int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc); pConn->pQueries = pBasic->queryDesc;
if (saveSize > 0 && pConn->pQueries != NULL) { pBasic->queryDesc = NULL;
memcpy(pConn->pQueries, pReq->pData, saveSize);
} pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -334,6 +328,111 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { ...@@ -334,6 +328,111 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
return NULL; return NULL;
} }
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, SClientHbBatchRsp *pBatchRsp) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
if (pHbReq->query) {
SQueryHbReqBasic *pBasic = pHbReq->query;
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(pMsg->handle, &connInfo);
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
if (pConn == NULL) {
pConn = mndCreateConn(pMnode, connInfo.user, connInfo.clientIp, connInfo.clientPort, pBasic->pid, pBasic->app, 0);
if (pConn == NULL) {
mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
return -1;
} else {
mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
}
} else if (pConn->killed) {
mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
return -1;
}
SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
if (rspBasic == NULL) {
mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
return -1;
}
mndSaveQueryList(pConn, pBasic);
if (pConn->killed != 0) {
rspBasic->killConnection = 1;
}
if (pConn->killId != 0) {
rspBasic->killRid = pConn->killId;
pConn->killId = 0;
}
rspBasic->connId = pConn->id;
rspBasic->totalDnodes = 1; //TODO
rspBasic->onlineDnodes = 1; //TODO
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
mndReleaseConn(pMnode, pConn);
hbRsp.query = rspBasic;
}
int32_t kvNum = taosHashGetSize(pHbReq->info);
if (NULL == pHbReq->info || kvNum <= 0) {
taosArrayPush(pBatchRsp->rsps, &hbRsp);
return TSDB_CODE_SUCCESS;
}
hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
if (NULL == hbRsp.info) {
mError("taosArrayInit %d rsp kv failed", kvNum);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
void *pIter = taosHashIterate(pHbReq->info, NULL);
while (pIter != NULL) {
SKv *kv = pIter;
switch (kv->key) {
case HEARTBEAT_KEY_DBINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
case HEARTBEAT_KEY_STBINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
default:
mError("invalid kv key:%d", kv->key);
hbRsp.status = TSDB_CODE_MND_APP_ERROR;
break;
}
pIter = taosHashIterate(pHbReq->info, pIter);
}
taosArrayPush(pBatchRsp->rsps, &hbRsp);
return TSDB_CODE_SUCCESS;
}
static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
...@@ -351,48 +450,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { ...@@ -351,48 +450,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i); SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
int32_t kvNum = taosHashGetSize(pHbReq->info); mndProcessQueryHeartBeat(pMnode, &pReq->rpcMsg, pHbReq, &batchRsp);
if (NULL == pHbReq->info || kvNum <= 0) {
continue;
}
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))};
void *pIter = taosHashIterate(pHbReq->info, NULL);
while (pIter != NULL) {
SKv *kv = pIter;
switch (kv->key) {
case HEARTBEAT_KEY_DBINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
case HEARTBEAT_KEY_STBINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
default:
mError("invalid kv key:%d", kv->key);
hbRsp.status = TSDB_CODE_MND_APP_ERROR;
break;
}
pIter = taosHashIterate(pHbReq->info, pIter);
}
taosArrayPush(batchRsp.rsps, &hbRsp);
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
if (pRsp != NULL) { if (pRsp != NULL) {
...@@ -421,73 +479,8 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { ...@@ -421,73 +479,8 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
taosArrayDestroy(batchRsp.rsps); taosArrayDestroy(batchRsp.rsps);
pReq->rspLen = tlen; pReq->rspLen = tlen;
pReq->pRsp = buf; pReq->pRsp = buf;
return 0;
#if 0
SMnode *pMnode = pReq->pNode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont;
pHeartbeat->connId = htonl(pHeartbeat->connId);
pHeartbeat->pid = htonl(pHeartbeat->pid);
SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
if (pConn == NULL) {
pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);
if (pConn == NULL) {
mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr());
return -1;
} else {
mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id);
}
} else if (pConn->killed) {
mError("user:%s, conn:%d is already killed", pReq->user, pConn->id);
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
return -1;
} else {
if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) {
char oldIpStr[40];
char newIpStr[40];
taosIpPort2String(pConn->ip, pConn->port, oldIpStr);
taosIpPort2String(info.clientIp, info.clientPort, newIpStr);
mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr,
pConn->user, oldIpStr);
if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
return -1;
}
}
SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp));
if (pRsp == NULL) {
mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr());
return -1;
}
mndSaveQueryStreamList(pConn, pHeartbeat);
if (pConn->killed != 0) {
pRsp->killConnection = 1;
}
if (pConn->queryId != 0) {
pRsp->queryId = htonl(pConn->queryId);
pConn->queryId = 0;
}
pRsp->connId = htonl(pConn->id);
pRsp->totalDnodes = htonl(1);
pRsp->onlineDnodes = htonl(1);
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
mndReleaseConn(pMnode, pConn);
pReq->contLen = sizeof(SConnectRsp);
pReq->pRsp = pRsp;
return 0; return 0;
#endif
} }
static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) { static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
...@@ -518,7 +511,7 @@ static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) { ...@@ -518,7 +511,7 @@ static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
return -1; return -1;
} else { } else {
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user); mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user);
pConn->queryId = killReq.queryId; pConn->killId = killReq.queryId;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false); taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return 0; return 0;
} }
...@@ -651,7 +644,7 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int ...@@ -651,7 +644,7 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int
cols = 0; cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pConn->id; *(uint32_t *)pWrite = pConn->id;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
...@@ -808,6 +801,7 @@ static int32_t mndGetQueryMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p ...@@ -808,6 +801,7 @@ static int32_t mndGetQueryMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
int32_t numOfRows = 0; int32_t numOfRows = 0;
#if 0
SConnObj *pConn = NULL; SConnObj *pConn = NULL;
int32_t cols = 0; int32_t cols = 0;
char *pWrite; char *pWrite;
...@@ -905,6 +899,7 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i ...@@ -905,6 +899,7 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
#endif
return numOfRows; return numOfRows;
} }
...@@ -917,4 +912,4 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) { ...@@ -917,4 +912,4 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
int32_t mndGetNumOfConnections(SMnode *pMnode) { int32_t mndGetNumOfConnections(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
return taosCacheGetNumOfObj(pMgmt->cache); return taosCacheGetNumOfObj(pMgmt->cache);
} }
\ No newline at end of file
...@@ -156,8 +156,8 @@ typedef struct SSchJob { ...@@ -156,8 +156,8 @@ typedef struct SSchJob {
int32_t levelNum; int32_t levelNum;
int32_t taskNum; int32_t taskNum;
void *transport; void *transport;
SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeAddr>
SArray *levels; // Element is SQueryLevel, starting from 0. SArray<SSchLevel> SArray *levels; // starting from 0. SArray<SSchLevel>
SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
int32_t levelIdx; int32_t levelIdx;
......
...@@ -2655,6 +2655,34 @@ _return: ...@@ -2655,6 +2655,34 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
int32_t code = 0;
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qDebug("acquire job from jobRef list failed, may not started or dropped, refId:%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) {
qDebug("job not initialized or not executable job, refId:%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
taosArrayPush(pSub, &subDesc);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t scheduleCancelJob(int64_t job) { int32_t scheduleCancelJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) { if (NULL == pJob) {
...@@ -2672,7 +2700,7 @@ int32_t scheduleCancelJob(int64_t job) { ...@@ -2672,7 +2700,7 @@ int32_t scheduleCancelJob(int64_t job) {
void schedulerFreeJob(int64_t job) { void schedulerFreeJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) { if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); qDebug("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
return; return;
} }
......
...@@ -303,6 +303,21 @@ void taosArrayClear(SArray* pArray) { ...@@ -303,6 +303,21 @@ void taosArrayClear(SArray* pArray) {
pArray->size = 0; pArray->size = 0;
} }
void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
if (pArray == NULL) return;
if (fp == NULL) {
pArray->size = 0;
return;
}
for (int32_t i = 0; i < pArray->size; ++i) {
fp(TARRAY_GET_ELEM(pArray, i));
}
pArray->size = 0;
}
void* taosArrayDestroy(SArray* pArray) { void* taosArrayDestroy(SArray* pArray) {
if (pArray) { if (pArray) {
taosMemoryFree(pArray->pData); taosMemoryFree(pArray->pData);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册