提交 03a366fa 编写于 作者: D dapan1121

hb process

上级 b024db90
......@@ -76,6 +76,12 @@ typedef enum {
HEARTBEAT_TYPE_MAX
} EHbType;
typedef enum {
HEARTBEAT_KEY_DBINFO = 1,
HEARTBEAT_KEY_STBINFO,
};
typedef enum _mgmt_table {
TSDB_MGMT_TABLE_START,
TSDB_MGMT_TABLE_ACCT,
......@@ -1335,9 +1341,8 @@ static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBat
}
typedef struct {
int32_t keyLen;
int32_t key;
int32_t valueLen;
void* key;
void* value;
} SKv;
......@@ -1359,8 +1364,7 @@ typedef struct {
typedef struct {
SClientHbKey connKey;
int32_t status;
int32_t bodyLen;
void* body;
SArray* info; // Array<Skv>
} SClientHbRsp;
typedef struct {
......@@ -1402,17 +1406,15 @@ void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp);
static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKv->keyLen);
tlen += taosEncodeFixedI32(buf, pKv->key);
tlen += taosEncodeFixedI32(buf, pKv->valueLen);
tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen);
tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen);
return tlen;
}
static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) {
buf = taosDecodeFixedI32(buf, &pKv->keyLen);
buf = taosDecodeFixedI32(buf, &pKv->key);
buf = taosDecodeFixedI32(buf, &pKv->valueLen);
buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen);
buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen);
return buf;
}
......
......@@ -94,7 +94,7 @@ typedef struct SUseDbOutput {
} SUseDbOutput;
enum {
META_TYPE_NON_TABLE = 1,
META_TYPE_NULL_TABLE = 1,
META_TYPE_CTABLE,
META_TYPE_TABLE,
META_TYPE_BOTH_TABLE
......@@ -174,7 +174,7 @@ extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSi
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE
#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
......
......@@ -37,7 +37,15 @@ typedef struct SAppInstInfo SAppInstInfo;
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req);
typedef struct SHbConnInfo {
void *param;
SClientHbReq *req;
} SHbConnInfo;
typedef struct SAppHbMgr {
char *key;
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
......@@ -49,7 +57,7 @@ typedef struct SAppHbMgr {
SAppInstInfo* pAppInstInfo;
// info
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
} SAppHbMgr;
typedef struct SClientHbMgr {
......@@ -59,12 +67,10 @@ typedef struct SClientHbMgr {
pthread_t thread;
pthread_mutex_t lock; // used when app init and cleanup
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX];
FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;
// TODO: embed param into function
// return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);
typedef struct SQueryExecMetric {
int64_t start; // start timestamp
......
......@@ -21,27 +21,136 @@ static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread();
static void hbStopThread();
static int32_t hbMqHbRspHandle(SClientHbRsp* pRsp) {
static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
return 0;
}
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) {
tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType);
return TSDB_CODE_SUCCESS;
}
int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
for (int32_t i = 0; i < kvNum; ++i) {
SKv *kv = taosArrayGet(pRsp->info, i);
switch (kv->key) {
case HEARTBEAT_KEY_DBINFO:
break;
case HEARTBEAT_KEY_STBINFO:
break;
default:
tscError("invalid hb key type:%d", kv->key);
break;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) {
if (code != 0) {
return -1;
}
SClientHbRsp* pRsp = (SClientHbRsp*) pMsg->pData;
return hbMqHbRspHandle(pRsp);
char *key = (char *)param;
SClientHbBatchRsp* pRsp = (SClientHbBatchRsp*) pMsg->pData;
int32_t reqNum = taosArrayGetSize(pRsp->rsps);
SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
if (pInst == NULL || NULL == *pInst) {
tscError("cluster not exist, key:%s", key);
return -1;
}
for (int32_t i = 0; i < reqNum; ++i) {
SClientHbRsp* rsp = taosArrayGet(pRsp->rsps, i);
code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp);
if (code) {
break;
}
}
return code;
}
void hbMgrInitMqHbRspHandle() {
clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SDbVgVersion *dbs = NULL;
uint32_t dbNum = 0;
int32_t code = 0;
code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
for (int32_t i = 0; i < dbNum; ++i) {
SDbVgVersion *db = &dbs[i];
db->dbId = htobe64(db->dbId);
db->vgVersion = htonl(db->vgVersion);
}
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs};
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS;
}
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {
int64_t *clusterId = (int64_t *)param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
return code;
}
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
return TSDB_CODE_SUCCESS;
}
int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {
}
void hbMgrInitMqHbHandle() {
clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle;
clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle;
clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle;
clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
}
static FORCE_INLINE void hbMgrInitHandle() {
// init all handle
hbMgrInitMqHbRspHandle();
hbMgrInitMqHbHandle();
}
void hbFreeReqKvHash(SHashObj* info) {
void *pIter = taosHashIterate(info, NULL);
while (pIter != NULL) {
SKv* kv = pIter;
tfree(kv->value);
pIter = taosHashIterate(info, pIter);
}
}
void hbFreeReq(SClientHbReq *req) {
hbFreeReqKvHash(req->info);
}
SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq* pBatchReq = malloc(sizeof(SClientHbBatchReq));
if (pBatchReq == NULL) {
......@@ -51,30 +160,48 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
int32_t code = 0;
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) {
SClientHbReq* pOneReq = pIter;
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
if (info) {
code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq);
if (code) {
taosHashCancelIterate(pAppHbMgr->activeInfo, pIter);
break;
}
}
taosArrayPush(pBatchReq->reqs, pOneReq);
taosHashClear(pOneReq->info);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
#if 0
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
while (pIter != NULL) {
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
SClientHbKey connKey;
taosHashCopyKey(pIter, &connKey);
SArray* pArray = getConnInfoFp(connKey, NULL);
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter);
if (code) {
taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
tfree(pBatchReq);
}
#endif
return pBatchReq;
}
void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) {
SClientHbReq* pOneReq = pIter;
hbFreeReqKvHash(pOneReq->info);
taosHashClear(pOneReq->info);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
}
static void* hbThreadFunc(void* param) {
setThreadName("hb");
while (1) {
......@@ -98,7 +225,9 @@ static void* hbThreadFunc(void* param) {
int tlen = tSerializeSClientHbBatchReq(NULL, pReq);
void *buf = malloc(tlen);
if (buf == NULL) {
//TODO: error handling
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr);
break;
}
void *abuf = buf;
......@@ -107,6 +236,7 @@ static void* hbThreadFunc(void* param) {
if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr);
free(buf);
break;
}
......@@ -114,7 +244,7 @@ static void* hbThreadFunc(void* param) {
pInfo->msgInfo.pData = buf;
pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_HEARTBEAT;
pInfo->param = NULL;
pInfo->param = strdup(pAppHbMgr->key);
pInfo->requestId = generateRequestId();
pInfo->requestObjRefId = 0;
......@@ -148,7 +278,7 @@ static void hbStopThread() {
atomic_store_8(&clientHbMgr.threadStop, 1);
}
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
hbMgrInit();
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) {
......@@ -160,6 +290,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
pAppHbMgr->connKeyCnt = 0;
pAppHbMgr->reportCnt = 0;
pAppHbMgr->reportBytes = 0;
pAppHbMgr->key = strdup(key);
// init app info
pAppHbMgr->pAppInstInfo = pAppInstInfo;
......@@ -243,7 +374,7 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp) {
return 0;
}
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) {
int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
// init hash in activeinfo
void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (data != NULL) {
......@@ -252,16 +383,42 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func
SClientHbReq hbReq;
hbReq.connKey = connKey;
hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
// init hash
if (func != NULL) {
taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo));
if (info != NULL) {
SClientHbReq * pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
info->req = pReq;
taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo));
}
atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
return 0;
}
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
SHbConnInfo info = {0};
switch (hbType) {
case HEARTBEAT_TYPE_QUERY: {
int64_t *pClusterId = malloc(sizeof(int64_t));
*pClusterId = clusterId;
info.param = pClusterId;
break;
}
case HEARTBEAT_TYPE_MQ: {
break;
}
default:
break;
}
return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
}
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey));
......
......@@ -119,7 +119,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p);
p->pAppHbMgr = appHbMgrInit(p, key);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p;
......
......@@ -71,8 +71,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->pAppInfo->clusterId = pConnect->clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
......
......@@ -91,13 +91,11 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int32_t kvNum = taosHashGetSize(pReq->info);
tlen += taosEncodeFixedI32(buf, kvNum);
SKv kv;
SKv *kv;
void* pIter = taosHashIterate(pReq->info, NULL);
while (pIter != NULL) {
taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen);
kv.valueLen = taosHashGetDataLen(pIter);
kv.value = pIter;
tlen += taosEncodeSKv(buf, &kv);
kv = pIter;
tlen += taosEncodeSKv(buf, kv);
pIter = taosHashIterate(pReq->info, pIter);
}
......@@ -116,7 +114,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) {
for(int i = 0; i < kvNum; i++) {
SKv kv;
buf = taosDecodeSKv(buf, &kv);
taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen);
taosHashPut(pReq->info, kv.key, sizeof(kv.key), kv.value, kv.valueLen);
}
return buf;
......@@ -124,17 +122,28 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) {
int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp) {
int tlen = 0;
int32_t kvNum = taosArrayGetSize(pRsp->info);
tlen += taosEncodeSClientHbKey(buf, &pRsp->connKey);
tlen += taosEncodeFixedI32(buf, pRsp->status);
tlen += taosEncodeFixedI32(buf, pRsp->bodyLen);
tlen += taosEncodeBinary(buf, pRsp->body, pRsp->bodyLen);
tlen += taosEncodeFixedI32(buf, kvNum);
for (int i = 0; i < kvNum; i++) {
SKv *kv = (SKv *)taosArrayGet(pRsp->info, i);
tlen += taosEncodeSKv(buf, kv);
}
return tlen;
}
void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp) {
int32_t kvNum = 0;
buf = taosDecodeSClientHbKey(buf, &pRsp->connKey);
buf = taosDecodeFixedI32(buf, &pRsp->status);
buf = taosDecodeFixedI32(buf, &pRsp->bodyLen);
buf = taosDecodeBinary(buf, &pRsp->body, pRsp->bodyLen);
buf = taosDecodeFixedI32(buf, &kvNum);
pRsp->info = taosArrayInit(kvNum, sizeof(SKv));
for (int i = 0; i < kvNum; i++) {
SKv kv = {0};
buf = taosDecodeSKv(buf, &kv);
taosArrayPush(pRsp->info, &kv);
}
return buf;
}
......@@ -155,6 +164,7 @@ void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pBatchReq) {
if (pBatchReq->reqs == NULL) {
pBatchReq->reqs = taosArrayInit(0, sizeof(SClientHbReq));
}
int32_t reqNum;
buf = taosDecodeFixedI32(buf, &reqNum);
for (int i = 0; i < reqNum; i++) {
......
......@@ -805,30 +805,10 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
SUseDbReq *pUse = pReq->rpcMsg.pCont;
pUse->vgVersion = htonl(pUse->vgVersion);
SDbObj *pDb = mndAcquireDb(pMnode, pUse->db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
mError("db:%s, failed to process use db req since %s", pUse->db, terrstr());
return -1;
}
int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
SUseDbRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SVgroupInfo *vgList, int32_t *vgNum) {
int32_t vindex = 0;
SSdb *pSdb = pMnode->pSdb;
if (pUse->vgVersion < pDb->vgVersion) {
void *pIter = NULL;
while (vindex < pDb->cfg.numOfVgroups) {
SVgObj *pVgroup = NULL;
......@@ -836,7 +816,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) {
SVgroupInfo *pInfo = &pRsp->vgroupInfo[vindex];
SVgroupInfo *pInfo = &vgList[vindex];
pInfo->vgId = htonl(pVgroup->vgId);
pInfo->hashBegin = htonl(pVgroup->hashBegin);
pInfo->hashEnd = htonl(pVgroup->hashEnd);
......@@ -859,12 +839,41 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
sdbRelease(pSdb, pVgroup);
}
*vgNum = vindex;
}
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
SUseDbReq *pUse = pReq->rpcMsg.pCont;
pUse->vgVersion = htonl(pUse->vgVersion);
SDbObj *pDb = mndAcquireDb(pMnode, pUse->db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
mError("db:%s, failed to process use db req since %s", pUse->db, terrstr());
return -1;
}
int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
SUseDbRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t vgNum = 0;
if (pUse->vgVersion < pDb->vgVersion) {
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum);
}
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
pRsp->uid = htobe64(pDb->uid);
pRsp->vgVersion = htonl(pDb->vgVersion);
pRsp->vgNum = htonl(vindex);
pRsp->vgNum = htonl(vgNum);
pRsp->hashMethod = pDb->hashMethod;
pReq->pCont = pRsp;
......@@ -874,6 +883,72 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
return 0;
}
int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen) {
SSdb *pSdb = pMnode->pSdb;
int32_t bufSize = num * (sizeof(SUseDbRsp) + TSDB_DEFAULT_VN_PER_DB * sizeof(SVgroupInfo));
void *buf = malloc(bufSize);
int32_t len = 0;
int32_t contLen = 0;
int32_t bufOffset = 0;
SUseDbRsp *pRsp = NULL;
for (int32_t i = 0; i < num; ++i) {
SDbVgVersion *db = &dbs[i];
len = 0;
SDbObj *pDb = mndAcquireDb(pMnode, db->dbName);
if (pDb == NULL) {
mInfo("db %s not exist", db->dbName);
len = sizeof(SUseDbRsp);
} else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) {
len = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
}
if (0 == len) {
mndReleaseDb(pMnode, pDb);
continue;
}
contLen += len;
if (contLen > bufSize) {
buf = realloc(buf, contLen);
}
pRsp = (SUseDbRsp *)((char *)buf + bufOffset);
memcpy(pRsp->db, db->dbName, TSDB_DB_FNAME_LEN);
if (pDb) {
int32_t vgNum = 0;
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum);
pRsp->uid = htobe64(pDb->uid);
pRsp->vgVersion = htonl(pDb->vgVersion);
pRsp->vgNum = htonl(vgNum);
pRsp->hashMethod = pDb->hashMethod;
} else {
pRsp->vgVersion = htonl(-1);
}
bufOffset += len;
mndReleaseDb(pMnode, pDb);
}
if (contLen > 0) {
*rsp = buf;
*rspLen = contLen;
} else {
*rsp = NULL;
tfree(buf);
*rspLen = 0;
}
return 0;
}
static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SSyncDbReq *pSync = pReq->rpcMsg.pCont;
......
......@@ -346,12 +346,46 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
int sz = taosArrayGetSize(pArray);
SClientHbBatchRsp batchRsp = {0};
batchRsp.key = batchReq.key;
batchRsp.keyLen = batchReq.keyLen;
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
for (int i = 0; i < sz; i++) {
SClientHbReq* pHbReq = taosArrayGet(pArray, i);
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
int32_t kvNum = taosHashGetSize(pHbReq->info);
if (NULL == pHbReq->info || kvNum <= 0) {
continue;
}
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))};
void *pIter = taosHashIterate(pHbReq->info, NULL);
while (pIter != NULL) {
SKv* kv = pIter;
switch (kv->key) {
case HEARTBEAT_KEY_DBINFO:
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp->info, &kv);
taosArrayPush(batchRsp.rsps, &hbRsp);
}
break;
case HEARTBEAT_KEY_STBINFO:
break;
default:
mError("invalid kv key:%d", kv->key);
break;
}
pIter = taosHashIterate(pHbReq->info, pIter);
}
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
if (pRsp != NULL) {
......@@ -366,6 +400,19 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
void* buf = rpcMallocCont(tlen);
void* abuf = buf;
tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info): 0;
for (int32_t n = 0; n < kvNum; ++n) {
SKv *kv = taosArrayGet(rsp->info, n);
tfree(kv->value);
}
taosArrayDestroy(rsp->info);
}
tfree(batchRsp.key);
taosArrayDestroy(batchRsp.rsps);
pReq->contLen = tlen;
pReq->pCont = buf;
......
......@@ -115,7 +115,7 @@ typedef struct SCatalogMgmt {
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_IS_META_NONE(type) ((type) == META_TYPE_NON_TABLE)
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
#define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE)
#define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE)
......
......@@ -253,7 +253,7 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransport
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
SET_META_TYPE_NONE(output->metaType);
SET_META_TYPE_NULL(output->metaType);
ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
......@@ -315,7 +315,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
SET_META_TYPE_NONE(output->metaType);
SET_META_TYPE_NULL(output->metaType);
ctgDebug("tablemeta not exist in vnode, tbName:%s", tNameGetTableName(pTableName));
return TSDB_CODE_SUCCESS;
}
......@@ -510,14 +510,14 @@ int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t s
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qError("meta in slot is empty, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (slot->needSort) {
taosArraySort(slot->meta, compare);
slot->needSort = false;
qDebug("slot meta sorted, slot idx:%d, type:%d", widx, mgmt->type);
qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
}
void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
......@@ -542,6 +542,42 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentRemove(SMetaRentMgmt *mgmt, int64_t id, __compar_fn_t compare) {
int16_t widx = abs(id % mgmt->slotNum);
SRentSlotInfo *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (slot->needSort) {
taosArraySort(slot->meta, compare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
}
int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ);
if (idx < 0) {
qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
taosArrayRemove(slot->meta, idx);
qDebug("meta in rent removed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
CTG_UNLOCK(CTG_WRITE, &slot->lock);
CTG_RET(code);
}
int32_t ctgMetaRentGetImpl(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
if (ridx >= mgmt->slotNum) {
......@@ -763,15 +799,20 @@ int32_t ctgValidateAndFreeDbInfo(struct SCatalog* pCatalog, const char* dbName,
return TSDB_CODE_SUCCESS;
}
int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target) {
int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target, bool *removed) {
*removed = false;
SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName));
if (info) {
if (NULL == info) {
ctgInfo("db not exist in dbCache, may be removed, db:%s", target->dbName);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_WRITE, &info->lock);
if (info->dbId != target->dbId) {
ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId);
CTG_UNLOCK(CTG_WRITE, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
return TSDB_CODE_SUCCESS;
}
......@@ -779,7 +820,6 @@ int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* targ
ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion);
CTG_UNLOCK(CTG_WRITE, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
return TSDB_CODE_SUCCESS;
}
......@@ -799,7 +839,8 @@ int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* targ
CTG_UNLOCK(CTG_WRITE, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
}
*removed = true;
return TSDB_CODE_SUCCESS;
}
......@@ -825,7 +866,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
// if get from mnode failed, will not try vnode
CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput));
if (CTG_IS_META_NONE(moutput.metaType)) {
if (CTG_IS_META_NULL(moutput.metaType)) {
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
} else {
output = &moutput;
......@@ -841,6 +882,8 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
voutput.metaType = moutput.metaType;
tfree(voutput.tbMeta);
voutput.tbMeta = moutput.tbMeta;
moutput.tbMeta = NULL;
......@@ -850,20 +893,22 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
if (0 == exist) {
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
if (CTG_IS_META_NONE(moutput.metaType)) {
SET_META_TYPE_NONE(voutput.metaType);
if (CTG_IS_META_NULL(moutput.metaType)) {
SET_META_TYPE_NULL(voutput.metaType);
}
tfree(voutput.tbMeta);
voutput.tbMeta = moutput.tbMeta;
moutput.tbMeta = NULL;
} else {
tfree(voutput.tbMeta);
SET_META_TYPE_CTABLE(voutput.metaType);
}
}
}
if (CTG_IS_META_NONE(output->metaType)) {
if (CTG_IS_META_NULL(output->metaType)) {
ctgError("no tablemeta got, tbNmae:%s", tNameGetTableName(pTableName));
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
......@@ -1221,43 +1266,26 @@ _return:
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) {
int32_t code = 0;
bool removed = false;
if (NULL == pCatalog || NULL == dbInfo) {
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (pCatalog->dbCache.cache) {
CTG_ERR_JRET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo));
CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
}
ctgWarn("db removed from cache, db:%s", dbName);
bool newAdded = false;
if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) {
ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
if (NULL == pCatalog->dbCache.cache) {
return TSDB_CODE_SUCCESS;
}
dbInfo->vgInfo = NULL;
SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion};
if (newAdded) {
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion)));
} else {
CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
CTG_ERR_RET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo, &removed));
if (!removed) {
return TSDB_CODE_SUCCESS;
}
ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion);
ctgInfo("db removed from cache, db:%s", dbInfo->dbName);
_return:
CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbInfo->dbId, ctgDbVgVersionCompare));
if (dbInfo && dbInfo->vgInfo) {
taosHashCleanup(dbInfo->vgInfo);
dbInfo->vgInfo = NULL;
}
ctgDebug("db removed from rent, db:%s", dbInfo->dbName);
CTG_RET(code);
}
......@@ -1365,6 +1393,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbInfo, pTableName, pVgroup));
_return:
if (dbInfo) {
CTG_UNLOCK(CTG_READ, &dbInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, dbInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册