未验证 提交 4d314b56 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #10066 from taosdata/feature/qnode

Feature/qnode
......@@ -76,6 +76,13 @@ typedef enum {
HEARTBEAT_TYPE_MAX
} EHbType;
enum {
HEARTBEAT_KEY_DBINFO = 1,
HEARTBEAT_KEY_STBINFO,
HEARTBEAT_KEY_MQ_TMP,
};
typedef enum _mgmt_table {
TSDB_MGMT_TABLE_START,
TSDB_MGMT_TABLE_ACCT,
......@@ -147,7 +154,7 @@ typedef struct {
} SBuildTableMetaInput;
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int32_t vgVersion;
} SBuildUseDBInput;
......@@ -745,7 +752,7 @@ typedef struct {
typedef struct {
char db[TSDB_DB_FNAME_LEN];
int64_t uid;
uint64_t uid;
int32_t vgVersion;
int32_t vgNum;
int8_t hashMethod;
......@@ -1340,9 +1347,8 @@ static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBat
}
typedef struct {
int32_t keyLen;
int32_t key;
int32_t valueLen;
void* key;
void* value;
} SKv;
......@@ -1364,8 +1370,7 @@ typedef struct {
typedef struct {
SClientHbKey connKey;
int32_t status;
int32_t bodyLen;
void* body;
SArray* info; // Array<Skv>
} SClientHbRsp;
typedef struct {
......@@ -1384,9 +1389,26 @@ void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq);
int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp);
void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp);
static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) {
void *pIter = taosHashIterate(info, NULL);
while (pIter != NULL) {
SKv* kv = (SKv*)pIter;
tfree(kv->value);
pIter = taosHashIterate(info, pIter);
}
}
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
SClientHbReq* req = (SClientHbReq*)pReq;
if (req->info) taosHashCleanup(req->info);
if (req->info) {
tFreeReqKvHash(req->info);
taosHashCleanup(req->info);
}
}
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
......@@ -1402,22 +1424,39 @@ static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
free(pReq);
}
static FORCE_INLINE void tFreeClientKv(void *pKv) {
SKv *kv = (SKv *)pKv;
if (kv) {
tfree(kv->value);
}
}
static FORCE_INLINE void tFreeClientHbRsp(void *pRsp) {
SClientHbRsp* rsp = (SClientHbRsp*)pRsp;
if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv);
}
static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
SClientHbBatchRsp *rsp = (SClientHbBatchRsp*)pRsp;
taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp);
}
int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp);
void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp);
static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKv->keyLen);
tlen += taosEncodeFixedI32(buf, pKv->key);
tlen += taosEncodeFixedI32(buf, pKv->valueLen);
tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen);
tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen);
return tlen;
}
static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) {
buf = taosDecodeFixedI32(buf, &pKv->keyLen);
buf = taosDecodeFixedI32(buf, &pKv->key);
buf = taosDecodeFixedI32(buf, &pKv->valueLen);
buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen);
buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen);
return buf;
}
......
......@@ -59,6 +59,7 @@ typedef struct SSTableMetaVersion {
} SSTableMetaVersion;
typedef struct SDbVgVersion {
char dbName[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t vgVersion;
} SDbVgVersion;
......@@ -98,6 +99,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo);
/**
* Get a table's meta data.
* @param pCatalog (input, got with catalogGetHandle)
......
......@@ -82,7 +82,7 @@ typedef struct STableMeta {
typedef struct SDBVgroupInfo {
SRWLatch lock;
int64_t dbId;
uint64_t dbId;
int32_t vgVersion;
int8_t hashMethod;
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
......@@ -94,7 +94,7 @@ typedef struct SUseDbOutput {
} SUseDbOutput;
enum {
META_TYPE_NON_TABLE = 1,
META_TYPE_NULL_TABLE = 1,
META_TYPE_CTABLE,
META_TYPE_TABLE,
META_TYPE_BOTH_TABLE
......@@ -163,7 +163,7 @@ extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSi
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE
#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
......
......@@ -35,9 +35,13 @@ extern "C" {
typedef struct SAppInstInfo SAppInstInfo;
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef struct SHbConnInfo {
void *param;
SClientHbReq *req;
} SHbConnInfo;
typedef struct SAppHbMgr {
char *key;
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
......@@ -49,9 +53,15 @@ typedef struct SAppHbMgr {
SAppInstInfo* pAppInstInfo;
// info
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
} SAppHbMgr;
typedef int32_t (*FHbRspHandle)(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp);
typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req);
typedef struct SClientHbMgr {
int8_t inited;
// ctl
......@@ -59,12 +69,10 @@ typedef struct SClientHbMgr {
pthread_t thread;
pthread_mutex_t lock; // used when app init and cleanup
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX];
FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;
// TODO: embed param into function
// return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);
typedef struct SQueryExecMetric {
int64_t start; // start timestamp
......@@ -218,11 +226,11 @@ void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo);
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
......
......@@ -127,6 +127,8 @@ void* openTransporter(const char *user, const char *auth, int32_t numOfThread) {
void destroyTscObj(void *pObj) {
STscObj *pTscObj = pObj;
SClientHbKey connKey = {.connId = pTscObj->connId, .hbType = pTscObj->connType};
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
tscDebug("connObj 0x%"PRIx64" destroyed, totalConn:%"PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
pthread_mutex_destroy(&pTscObj->mutex);
......
......@@ -15,33 +15,237 @@
#include "clientInt.h"
#include "trpc.h"
#include "catalog.h"
#include "clientLog.h"
static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread();
static void hbStopThread();
static int32_t hbMqHbRspHandle(SClientHbRsp* pRsp) {
static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
return 0;
}
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t msgLen = 0;
int32_t code = 0;
while (msgLen < valueLen) {
SUseDbRsp *rsp = (SUseDbRsp *)((char *)value + msgLen);
rsp->vgVersion = ntohl(rsp->vgVersion);
rsp->vgNum = ntohl(rsp->vgNum);
rsp->uid = be64toh(rsp->uid);
tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
if (rsp->vgVersion < 0) {
SDbVgVersion dbInfo;
strcpy(dbInfo.dbName, rsp->db);
dbInfo.dbId = rsp->uid;
dbInfo.vgVersion = rsp->vgVersion;
code = catalogRemoveDBVgroup(pCatalog, &dbInfo);
} else {
SDBVgroupInfo vgInfo = {0};
vgInfo.dbId = rsp->uid;
vgInfo.vgVersion = rsp->vgVersion;
vgInfo.hashMethod = rsp->hashMethod;
vgInfo.vgInfo = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo.vgInfo) {
tscError("hash init[%d] failed", rsp->vgNum);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < rsp->vgNum; ++i) {
rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId);
rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin);
rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd);
for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) {
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
}
if (0 != taosHashPut(vgInfo.vgInfo, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo.vgInfo);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
if (code) {
taosHashCleanup(vgInfo.vgInfo);
}
}
if (code) {
return code;
}
msgLen += sizeof(SUseDbRsp) + rsp->vgNum * sizeof(SVgroupInfo);
}
return TSDB_CODE_SUCCESS;
}
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) {
tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType);
return TSDB_CODE_SUCCESS;
}
int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
tscDebug("hb got %d rsp kv", kvNum);
for (int32_t i = 0; i < kvNum; ++i) {
SKv *kv = taosArrayGet(pRsp->info, i);
switch (kv->key) {
case HEARTBEAT_KEY_DBINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
break;
}
hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
case HEARTBEAT_KEY_STBINFO:
break;
default:
tscError("invalid hb key type:%d", kv->key);
break;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) {
static int32_t emptyRspNum = 0;
if (code != 0) {
tfree(param);
return -1;
}
SClientHbRsp* pRsp = (SClientHbRsp*) pMsg->pData;
return hbMqHbRspHandle(pRsp);
char *key = (char *)param;
SClientHbBatchRsp pRsp = {0};
tDeserializeSClientHbBatchRsp(pMsg->pData, &pRsp);
int32_t rspNum = taosArrayGetSize(pRsp.rsps);
SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
if (pInst == NULL || NULL == *pInst) {
tscError("cluster not exist, key:%s", key);
tfree(param);
tFreeClientHbBatchRsp(&pRsp);
return -1;
}
tfree(param);
if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp prior", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
} else {
atomic_add_fetch_32(&emptyRspNum, 1);
}
for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i);
code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp);
if (code) {
break;
}
}
tFreeClientHbBatchRsp(&pRsp);
return code;
}
void hbMgrInitMqHbRspHandle() {
clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SDbVgVersion *dbs = NULL;
uint32_t dbNum = 0;
int32_t code = 0;
code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (dbNum <= 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < dbNum; ++i) {
SDbVgVersion *db = &dbs[i];
db->dbId = htobe64(db->dbId);
db->vgVersion = htonl(db->vgVersion);
}
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs};
tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS;
}
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {
int64_t *clusterId = (int64_t *)param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
return code;
}
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
return TSDB_CODE_SUCCESS;
}
int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {
}
void hbMgrInitMqHbHandle() {
clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle;
clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle;
clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle;
clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
}
static FORCE_INLINE void hbMgrInitHandle() {
// init all handle
hbMgrInitMqHbRspHandle();
hbMgrInitMqHbHandle();
}
void hbFreeReq(void *req) {
SClientHbReq *pReq = (SClientHbReq *)req;
tFreeReqKvHash(pReq->info);
}
SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq* pBatchReq = malloc(sizeof(SClientHbBatchReq));
if (pBatchReq == NULL) {
......@@ -51,38 +255,58 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
int32_t code = 0;
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) {
SClientHbReq* pOneReq = pIter;
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
if (info) {
code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq);
if (code) {
taosHashCancelIterate(pAppHbMgr->activeInfo, pIter);
break;
}
}
taosArrayPush(pBatchReq->reqs, pOneReq);
taosHashClear(pOneReq->info);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
#if 0
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
while (pIter != NULL) {
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
SClientHbKey connKey;
taosHashCopyKey(pIter, &connKey);
SArray* pArray = getConnInfoFp(connKey, NULL);
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter);
if (code) {
taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
tfree(pBatchReq);
}
#endif
return pBatchReq;
}
void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) {
SClientHbReq* pOneReq = pIter;
tFreeReqKvHash(pOneReq->info);
taosHashClear(pOneReq->info);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
}
static void* hbThreadFunc(void* param) {
setThreadName("hb");
while (1) {
int8_t threadStop = atomic_load_8(&clientHbMgr.threadStop);
if(threadStop) {
int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2);
if(1 == threadStop) {
break;
}
pthread_mutex_lock(&clientHbMgr.lock);
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for(int i = 0; i < sz; i++) {
SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
......@@ -98,7 +322,9 @@ static void* hbThreadFunc(void* param) {
int tlen = tSerializeSClientHbBatchReq(NULL, pReq);
void *buf = malloc(tlen);
if (buf == NULL) {
//TODO: error handling
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr);
break;
}
void *abuf = buf;
......@@ -107,6 +333,7 @@ static void* hbThreadFunc(void* param) {
if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr);
free(buf);
break;
}
......@@ -114,7 +341,7 @@ static void* hbThreadFunc(void* param) {
pInfo->msgInfo.pData = buf;
pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_HEARTBEAT;
pInfo->param = NULL;
pInfo->param = strdup(pAppHbMgr->key);
pInfo->requestId = generateRequestId();
pInfo->requestObjRefId = 0;
......@@ -123,9 +350,13 @@ static void* hbThreadFunc(void* param) {
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
}
pthread_mutex_unlock(&clientHbMgr.lock);
taosMsleep(HEARTBEAT_INTERVAL);
}
return NULL;
......@@ -145,10 +376,19 @@ static int32_t hbCreateThread() {
}
static void hbStopThread() {
atomic_store_8(&clientHbMgr.threadStop, 1);
if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
tscDebug("hb thread already stopped");
return;
}
while (2 != atomic_load_8(&clientHbMgr.threadStop)) {
usleep(10);
}
tscDebug("hb thread stopped");
}
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
hbMgrInit();
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) {
......@@ -160,6 +400,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
pAppHbMgr->connKeyCnt = 0;
pAppHbMgr->reportCnt = 0;
pAppHbMgr->reportBytes = 0;
pAppHbMgr->key = strdup(key);
// init app info
pAppHbMgr->pAppInstInfo = pAppInstInfo;
......@@ -174,19 +415,26 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
}
pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq;
// init getInfoFunc
pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (pAppHbMgr->getInfoFuncs == NULL) {
if (pAppHbMgr->connInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
free(pAppHbMgr);
return NULL;
}
pthread_mutex_lock(&clientHbMgr.lock);
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
pthread_mutex_unlock(&clientHbMgr.lock);
return pAppHbMgr;
}
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) {
if (NULL == pAppHbMgr) {
return;
}
pthread_mutex_lock(&clientHbMgr.lock);
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
......@@ -194,7 +442,9 @@ void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) {
SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == pTarget) {
taosHashCleanup(pTarget->activeInfo);
taosHashCleanup(pTarget->getInfoFuncs);
pTarget->activeInfo = NULL;
taosHashCleanup(pTarget->connInfo);
pTarget->connInfo = NULL;
}
}
......@@ -219,31 +469,20 @@ int hbMgrInit() {
}
void hbMgrCleanUp() {
hbStopThread();
// destroy all appHbMgr
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
if (old == 0) return;
pthread_mutex_lock(&clientHbMgr.lock);
taosArrayDestroy(clientHbMgr.appHbMgrs);
}
int hbHandleRsp(SClientHbBatchRsp* hbRsp) {
int64_t reqId = hbRsp->reqId;
int64_t rspId = hbRsp->rspId;
pthread_mutex_unlock(&clientHbMgr.lock);
SArray* rsps = hbRsp->rsps;
int32_t sz = taosArrayGetSize(rsps);
for (int i = 0; i < sz; i++) {
SClientHbRsp* pRsp = taosArrayGet(rsps, i);
if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) {
clientHbMgr.handle[pRsp->connKey.hbType](pRsp);
} else {
// discard rsp
}
}
return 0;
clientHbMgr.appHbMgrs = NULL;
}
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) {
int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
// init hash in activeinfo
void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (data != NULL) {
......@@ -252,20 +491,53 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func
SClientHbReq hbReq;
hbReq.connKey = connKey;
hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
// init hash
if (func != NULL) {
taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo));
if (info != NULL) {
SClientHbReq * pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
info->req = pReq;
taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo));
}
atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
return 0;
}
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
SHbConnInfo info = {0};
switch (hbType) {
case HEARTBEAT_TYPE_QUERY: {
int64_t *pClusterId = malloc(sizeof(int64_t));
*pClusterId = clusterId;
info.param = pClusterId;
break;
}
case HEARTBEAT_TYPE_MQ: {
break;
}
default:
break;
}
return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
}
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey));
int32_t code = 0;
code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
if (code) {
return;
}
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
if (atomic_load_32(&pAppHbMgr->connKeyCnt) <= 0) {
appHbMgrCleanup(pAppHbMgr);
}
}
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
......
......@@ -119,7 +119,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
/*p->pAppHbMgr = appHbMgrInit(p);*/
p->pAppHbMgr = appHbMgrInit(p, key);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p;
......@@ -472,13 +472,8 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
return NULL;
}
SKv kv = {0};
kv.key = malloc(256);
if (kv.key == NULL) {
taosArrayDestroy(pArray);
return NULL;
}
strcpy(kv.key, "mq-tmp");
kv.keyLen = strlen("mq-tmp") + 1;
kv.key = HEARTBEAT_KEY_MQ_TMP;
SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
if (pMqHb == NULL) {
return pArray;
......
......@@ -46,6 +46,8 @@ void taos_cleanup(void) {
clientConnRefPool = -1;
taosCloseRef(id);
hbMgrCleanUp();
rpcCleanup();
catalogDestroy();
taosCloseLog();
......
......@@ -18,6 +18,7 @@
#include "tname.h"
#include "clientInt.h"
#include "clientLog.h"
#include "catalog.h"
int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
......@@ -73,8 +74,9 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->pAppInfo->clusterId = pConnect->clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
/*SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};*/
/*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/
pTscObj->connType = HEARTBEAT_TYPE_QUERY;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
......@@ -290,7 +292,6 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
}
int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
// todo: Remove cache in catalog cache.
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
......@@ -298,6 +299,18 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
SDropDbRsp *rsp = (SDropDbRsp *)pMsg->pData;
SDbVgVersion dbVer = {0};
struct SCatalog *pCatalog = NULL;
strncpy(dbVer.dbName, rsp->db, sizeof(dbVer.dbName));
dbVer.dbId = be64toh(rsp->uid);
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveDBVgroup(pCatalog, &dbVer);
tsem_post(&pRequest->body.rspSem);
return code;
}
......
......@@ -91,13 +91,11 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int32_t kvNum = taosHashGetSize(pReq->info);
tlen += taosEncodeFixedI32(buf, kvNum);
SKv kv;
SKv *kv;
void* pIter = taosHashIterate(pReq->info, NULL);
while (pIter != NULL) {
taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen);
kv.valueLen = taosHashGetDataLen(pIter);
kv.value = pIter;
tlen += taosEncodeSKv(buf, &kv);
kv = pIter;
tlen += taosEncodeSKv(buf, kv);
pIter = taosHashIterate(pReq->info, pIter);
}
......@@ -116,7 +114,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) {
for(int i = 0; i < kvNum; i++) {
SKv kv;
buf = taosDecodeSKv(buf, &kv);
taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen);
taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
}
return buf;
......@@ -124,17 +122,28 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) {
int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp) {
int tlen = 0;
int32_t kvNum = taosArrayGetSize(pRsp->info);
tlen += taosEncodeSClientHbKey(buf, &pRsp->connKey);
tlen += taosEncodeFixedI32(buf, pRsp->status);
tlen += taosEncodeFixedI32(buf, pRsp->bodyLen);
tlen += taosEncodeBinary(buf, pRsp->body, pRsp->bodyLen);
tlen += taosEncodeFixedI32(buf, kvNum);
for (int i = 0; i < kvNum; i++) {
SKv *kv = (SKv *)taosArrayGet(pRsp->info, i);
tlen += taosEncodeSKv(buf, kv);
}
return tlen;
}
void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp) {
int32_t kvNum = 0;
buf = taosDecodeSClientHbKey(buf, &pRsp->connKey);
buf = taosDecodeFixedI32(buf, &pRsp->status);
buf = taosDecodeFixedI32(buf, &pRsp->bodyLen);
buf = taosDecodeBinary(buf, &pRsp->body, pRsp->bodyLen);
buf = taosDecodeFixedI32(buf, &kvNum);
pRsp->info = taosArrayInit(kvNum, sizeof(SKv));
for (int i = 0; i < kvNum; i++) {
SKv kv = {0};
buf = taosDecodeSKv(buf, &kv);
taosArrayPush(pRsp->info, &kv);
}
return buf;
}
......@@ -155,6 +164,7 @@ void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pBatchReq) {
if (pBatchReq->reqs == NULL) {
pBatchReq->reqs = taosArrayInit(0, sizeof(SClientHbReq));
}
int32_t reqNum;
buf = taosDecodeFixedI32(buf, &reqNum);
for (int i = 0; i < reqNum; i++) {
......
......@@ -26,6 +26,7 @@ int32_t mndInitDb(SMnode *pMnode);
void mndCleanupDb(SMnode *pMnode);
SDbObj *mndAcquireDb(SMnode *pMnode, char *db);
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen);
#ifdef __cplusplus
}
......
......@@ -813,30 +813,10 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
SUseDbReq *pUse = pReq->rpcMsg.pCont;
pUse->vgVersion = htonl(pUse->vgVersion);
SDbObj *pDb = mndAcquireDb(pMnode, pUse->db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
mError("db:%s, failed to process use db req since %s", pUse->db, terrstr());
return -1;
}
int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
SUseDbRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SVgroupInfo *vgList, int32_t *vgNum) {
int32_t vindex = 0;
SSdb *pSdb = pMnode->pSdb;
if (pUse->vgVersion < pDb->vgVersion) {
void *pIter = NULL;
while (vindex < pDb->cfg.numOfVgroups) {
SVgObj *pVgroup = NULL;
......@@ -844,7 +824,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) {
SVgroupInfo *pInfo = &pRsp->vgroupInfo[vindex];
SVgroupInfo *pInfo = &vgList[vindex];
pInfo->vgId = htonl(pVgroup->vgId);
pInfo->hashBegin = htonl(pVgroup->hashBegin);
pInfo->hashEnd = htonl(pVgroup->hashEnd);
......@@ -867,12 +847,41 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
sdbRelease(pSdb, pVgroup);
}
*vgNum = vindex;
}
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
SUseDbReq *pUse = pReq->rpcMsg.pCont;
pUse->vgVersion = htonl(pUse->vgVersion);
SDbObj *pDb = mndAcquireDb(pMnode, pUse->db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
mError("db:%s, failed to process use db req since %s", pUse->db, terrstr());
return -1;
}
int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
SUseDbRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t vgNum = 0;
if (pUse->vgVersion < pDb->vgVersion) {
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum);
}
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
pRsp->uid = htobe64(pDb->uid);
pRsp->vgVersion = htonl(pDb->vgVersion);
pRsp->vgNum = htonl(vindex);
pRsp->vgNum = htonl(vgNum);
pRsp->hashMethod = pDb->hashMethod;
pReq->pCont = pRsp;
......@@ -882,6 +891,77 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
return 0;
}
int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen) {
SSdb *pSdb = pMnode->pSdb;
int32_t bufSize = num * (sizeof(SUseDbRsp) + TSDB_DEFAULT_VN_PER_DB * sizeof(SVgroupInfo));
void *buf = malloc(bufSize);
int32_t len = 0;
int32_t contLen = 0;
int32_t bufOffset = 0;
SUseDbRsp *pRsp = NULL;
for (int32_t i = 0; i < num; ++i) {
SDbVgVersion *db = &dbs[i];
db->dbId = be64toh(db->dbId);
db->vgVersion = ntohl(db->vgVersion);
len = 0;
SDbObj *pDb = mndAcquireDb(pMnode, db->dbName);
if (pDb == NULL) {
mInfo("db %s not exist", db->dbName);
len = sizeof(SUseDbRsp);
} else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) {
len = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
}
if (0 == len) {
mndReleaseDb(pMnode, pDb);
continue;
}
contLen += len;
if (contLen > bufSize) {
buf = realloc(buf, contLen);
}
pRsp = (SUseDbRsp *)((char *)buf + bufOffset);
memcpy(pRsp->db, db->dbName, TSDB_DB_FNAME_LEN);
if (pDb) {
int32_t vgNum = 0;
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum);
pRsp->uid = htobe64(pDb->uid);
pRsp->vgVersion = htonl(pDb->vgVersion);
pRsp->vgNum = htonl(vgNum);
pRsp->hashMethod = pDb->hashMethod;
} else {
pRsp->uid = htobe64(db->dbId);
pRsp->vgNum = htonl(0);
pRsp->hashMethod = 0;
pRsp->vgVersion = htonl(-1);
}
bufOffset += len;
mndReleaseDb(pMnode, pDb);
}
if (contLen > 0) {
*rsp = buf;
*rspLen = contLen;
} else {
*rsp = NULL;
tfree(buf);
*rspLen = 0;
}
return 0;
}
static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SSyncDbReq *pSync = pReq->rpcMsg.pCont;
......
......@@ -354,7 +354,41 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
for (int i = 0; i < sz; i++) {
SClientHbReq* pHbReq = taosArrayGet(pArray, i);
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
int32_t kvNum = taosHashGetSize(pHbReq->info);
if (NULL == pHbReq->info || kvNum <= 0) {
continue;
}
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))};
void *pIter = taosHashIterate(pHbReq->info, NULL);
while (pIter != NULL) {
SKv* kv = pIter;
switch (kv->key) {
case HEARTBEAT_KEY_DBINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv);
}
break;
}
case HEARTBEAT_KEY_STBINFO:
break;
default:
mError("invalid kv key:%d", kv->key);
hbRsp.status = TSDB_CODE_MND_APP_ERROR;
break;
}
pIter = taosHashIterate(pHbReq->info, pIter);
}
taosArrayPush(batchRsp.rsps, &hbRsp);
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
if (pRsp != NULL) {
......@@ -369,6 +403,18 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
void* buf = rpcMallocCont(tlen);
void* abuf = buf;
tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info): 0;
for (int32_t n = 0; n < kvNum; ++n) {
SKv *kv = taosArrayGet(rsp->info, n);
tfree(kv->value);
}
taosArrayDestroy(rsp->info);
}
taosArrayDestroy(batchRsp.rsps);
pReq->contLen = tlen;
pReq->pCont = buf;
......
......@@ -102,11 +102,10 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) {
req.connKey = {.connId = 123, .hbType = HEARTBEAT_TYPE_MQ};
req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
SKv kv;
kv.key = (void*)"abc";
kv.keyLen = 4;
kv.key = 123;
kv.value = (void*)"bcd";
kv.valueLen = 4;
taosHashPut(req.info, kv.key, kv.keyLen, kv.value, kv.valueLen);
taosHashPut(req.info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
taosArrayPush(batchReq.reqs, &req);
int32_t tlen = tSerializeSClientHbBatchReq(NULL, &batchReq);
......
......@@ -31,7 +31,7 @@ extern "C" {
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_RENT_SLOT_SECOND 2
#define CTG_RENT_SLOT_SECOND 1.5
#define CTG_DEFAULT_INVALID_VERSION (-1)
......@@ -115,7 +115,7 @@ typedef struct SCatalogMgmt {
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_IS_META_NONE(type) ((type) == META_TYPE_NON_TABLE)
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
#define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE)
#define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE)
......
......@@ -241,7 +241,7 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransport
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
SET_META_TYPE_NONE(output->metaType);
SET_META_TYPE_NULL(output->metaType);
ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
......@@ -299,7 +299,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
SET_META_TYPE_NONE(output->metaType);
SET_META_TYPE_NULL(output->metaType);
ctgDebug("tablemeta not exist in vnode, tbName:%s", tNameGetTableName(pTableName));
return TSDB_CODE_SUCCESS;
}
......@@ -427,9 +427,9 @@ int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
}
int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) {
return -1;
} else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
} else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) {
return 1;
} else {
return 0;
......@@ -494,14 +494,14 @@ int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t s
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qError("meta in slot is empty, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (slot->needSort) {
taosArraySort(slot->meta, compare);
slot->needSort = false;
qDebug("slot meta sorted, slot idx:%d, type:%d", widx, mgmt->type);
qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
}
void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
......@@ -526,6 +526,42 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentRemove(SMetaRentMgmt *mgmt, int64_t id, __compar_fn_t compare) {
int16_t widx = abs(id % mgmt->slotNum);
SRentSlotInfo *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (slot->needSort) {
taosArraySort(slot->meta, compare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
}
int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ);
if (idx < 0) {
qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
taosArrayRemove(slot->meta, idx);
qDebug("meta in rent removed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
CTG_UNLOCK(CTG_WRITE, &slot->lock);
CTG_RET(code);
}
int32_t ctgMetaRentGetImpl(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
if (ridx >= mgmt->slotNum) {
......@@ -600,7 +636,7 @@ int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t s
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
int32_t code = 0;
if (NULL == output->tbMeta) {
if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......@@ -721,7 +757,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
}
int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
int32_t ctgValidateAndFreeDbInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (oldInfo) {
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
......@@ -747,6 +783,47 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD
return TSDB_CODE_SUCCESS;
}
int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target, bool *removed) {
*removed = false;
SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName));
if (NULL == info) {
ctgInfo("db not exist in dbCache, may be removed, db:%s", target->dbName);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_WRITE, &info->lock);
if (info->dbId != target->dbId) {
ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId);
CTG_UNLOCK(CTG_WRITE, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
return TSDB_CODE_SUCCESS;
}
if (info->vgInfo) {
ctgInfo("cleanup db vgInfo, db:%s", target->dbName);
taosHashCleanup(info->vgInfo);
info->vgInfo = NULL;
}
if (taosHashRemove(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName))) {
ctgError("taosHashRemove from dbCache failed, db:%s", target->dbName);
CTG_UNLOCK(CTG_WRITE, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
CTG_UNLOCK(CTG_WRITE, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
*removed = true;
return TSDB_CODE_SUCCESS;
}
int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
......@@ -767,7 +844,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
// if get from mnode failed, will not try vnode
CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput));
if (CTG_IS_META_NONE(moutput.metaType)) {
if (CTG_IS_META_NULL(moutput.metaType)) {
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
} else {
output = &moutput;
......@@ -783,6 +860,8 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
voutput.metaType = moutput.metaType;
tfree(voutput.tbMeta);
voutput.tbMeta = moutput.tbMeta;
moutput.tbMeta = NULL;
......@@ -792,20 +871,22 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
if (0 == exist) {
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
if (CTG_IS_META_NONE(moutput.metaType)) {
SET_META_TYPE_NONE(voutput.metaType);
if (CTG_IS_META_NULL(moutput.metaType)) {
SET_META_TYPE_NULL(voutput.metaType);
}
tfree(voutput.tbMeta);
voutput.tbMeta = moutput.tbMeta;
moutput.tbMeta = NULL;
} else {
tfree(voutput.tbMeta);
SET_META_TYPE_CTABLE(voutput.metaType);
}
}
}
if (CTG_IS_META_NONE(output->metaType)) {
if (CTG_IS_META_NULL(output->metaType)) {
ctgError("no tablemeta got, tbNmae:%s", tNameGetTableName(pTableName));
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
......@@ -1118,19 +1199,6 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (dbInfo->vgVersion < 0) {
ctgWarn("db vgVersion less than 0, dbName:%s, vgVersion:%d", dbName, dbInfo->vgVersion);
if (pCatalog->dbCache.cache) {
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
}
ctgWarn("db removed from cache, db:%s", dbName);
goto _return;
}
if (NULL == pCatalog->dbCache.cache) {
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == cache) {
......@@ -1142,10 +1210,12 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
taosHashCleanup(cache);
}
} else {
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
CTG_ERR_JRET(ctgValidateAndFreeDbInfo(pCatalog, dbName, dbInfo));
}
bool newAdded = false;
dbInfo->lock = 0;
if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) {
ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
......@@ -1154,6 +1224,8 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
dbInfo->vgInfo = NULL;
SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion};
strncpy(vgVersion.dbName, dbName, sizeof(vgVersion.dbName));
if (newAdded) {
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion)));
} else {
......@@ -1173,6 +1245,34 @@ _return:
CTG_RET(code);
}
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) {
int32_t code = 0;
bool removed = false;
if (NULL == pCatalog || NULL == dbInfo) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == pCatalog->dbCache.cache) {
return TSDB_CODE_SUCCESS;
}
CTG_ERR_RET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo, &removed));
if (!removed) {
return TSDB_CODE_SUCCESS;
}
ctgInfo("db removed from cache, db:%s", dbInfo->dbName);
CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbInfo->dbId, ctgDbVgVersionCompare));
ctgDebug("db removed from rent, db:%s", dbInfo->dbName);
CTG_RET(code);
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1);
}
......@@ -1275,6 +1375,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbInfo, pTableName, pVgroup));
_return:
if (dbInfo) {
CTG_UNLOCK(CTG_READ, &dbInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, dbInfo);
......
......@@ -59,7 +59,7 @@ int32_t ctgTestTagNum = 1;
int32_t ctgTestSVersion = 1;
int32_t ctgTestTVersion = 1;
int32_t ctgTestSuid = 2;
int64_t ctgTestDbId = 33;
uint64_t ctgTestDbId = 33;
uint64_t ctgTestClusterId = 0x1;
char *ctgTestDbname = "1.db1";
......
......@@ -186,6 +186,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
*pQueryEnd = pDispatcher->queryEnd;
}
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
......
......@@ -410,7 +410,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk")
// catalog
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog interval error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册