提交 57d34170 编写于 作者: L Liu Jicong

implement client heartbeat

上级 7583b45e
......@@ -140,6 +140,46 @@ typedef struct SKlv {
void* value;
} SKlv;
typedef struct SClientHbKey {
int32_t connId;
int32_t hbType;
} SClientHbKey;
typedef struct SClientHbReq {
SClientHbKey connKey;
SHashObj* info; // hash<Slv.key, Sklv>
} SClientHbReq;
typedef struct SClientHbBatchReq {
int64_t reqId;
SArray* reqs; // SArray<SClientHbReq>
} SClientHbBatchReq;
typedef struct SClientHbRsp {
SClientHbKey connKey;
int32_t status;
int32_t bodyLen;
void* body;
} SClientHbRsp;
typedef struct SClientHbBatchRsp {
int64_t reqId;
int64_t rspId;
SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp;
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
return taosIntHash_64(key, keyLen);
}
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq);
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
SClientHbReq* req = pReq;
taosHashCleanup(req->info);
free(pReq);
}
static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKlv->keyLen);
......@@ -156,10 +196,6 @@ static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) {
buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen);
return buf;
}
typedef struct SClientHbKey {
int32_t connId;
int32_t hbType;
} SClientHbKey;
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
int tlen = 0;
......@@ -174,32 +210,6 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey)
return buf;
}
typedef struct SClientHbReq {
SClientHbKey connKey;
SHashObj* info; // hash<Slv.key, Sklv>
} SClientHbReq;
typedef struct SClientHbBatchReq {
int64_t reqId;
SArray* reqs; // SArray<SClientHbReq>
} SClientHbBatchReq;
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq);
typedef struct SClientHbRsp {
SClientHbKey connKey;
int32_t status;
int32_t bodyLen;
void* body;
} SClientHbRsp;
typedef struct SClientHbBatchRsp {
int64_t reqId;
int64_t rspId;
SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp;
typedef struct SBuildTableMetaInput {
int32_t vgId;
char* dbName;
......
......@@ -18,9 +18,11 @@
#include "thash.h"
#include "tmsg.h"
#define HEARTBEAT_INTERVAL 1500 //ms
typedef enum {
mq = 0,
// type can be added here
HEARTBEAT_TYPE_MQ = 0,
// types can be added here
//
HEARTBEAT_TYPE_MAX
} EHbType;
......@@ -28,26 +30,16 @@ typedef enum {
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param);
typedef struct SClientHbMgr {
int8_t inited;
int32_t reportInterval; // unit ms
int32_t stats;
SRWLatch lock;
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
// input queue
} SClientHbMgr;
static SClientHbMgr clientHbMgr = {0};
// called by mgmt
int hbMgrInit();
void hbMgrCleanUp();
int hbHandleRsp(void* hbMsg);
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
//called by user
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func);
void hbDeregisterConn(SClientHbKey connKey);
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
// mq
void hbMgrInitMqHbRspHandle();
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "clientHb.h"
typedef struct SClientHbMgr {
int8_t inited;
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
int64_t reportBytes; // not implemented
int64_t startTime;
// thread
pthread_t thread;
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;
static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread();
static void hbStopThread();
static FORCE_INLINE void hbMgrInitHandle() {
// init all handle
hbMgrInitMqHbRspHandle();
}
static SClientHbBatchReq* hbGatherAllInfo() {
SClientHbBatchReq* pReq = malloc(sizeof(SClientHbBatchReq));
if(pReq == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
int32_t connKeyCnt = atomic_load_32(&clientHbMgr.connKeyCnt);
pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
void *pIter = taosHashIterate(clientHbMgr.activeInfo, pIter);
while (pIter != NULL) {
taosArrayPush(pReq->reqs, pIter);
SClientHbReq* pOneReq = pIter;
taosHashClear(pOneReq->info);
pIter = taosHashIterate(clientHbMgr.activeInfo, pIter);
}
return pReq;
}
static void* hbThreadFunc(void* param) {
setThreadName("hb");
while (1) {
atomic_add_fetch_32(&clientHbMgr.reportCnt, 1);
taosMsleep(HEARTBEAT_INTERVAL);
}
return NULL;
}
static int32_t hbCreateThread() {
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pthread_attr_destroy(&thAttr);
return 0;
}
int hbMgrInit() {
// init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0;
// init stat
clientHbMgr.startTime = taosGetTimestampMs();
// init handle funcs
hbMgrInitHandle();
// init hash info
clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
clientHbMgr.activeInfo->freeFp = tFreeClientHbReq;
// init getInfoFunc
clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
// init backgroud thread
return 0;
}
void hbMgrCleanUp() {
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
if (old == 0) return;
taosHashCleanup(clientHbMgr.activeInfo);
taosHashCleanup(clientHbMgr.getInfoFuncs);
}
int hbHandleRsp(SClientHbBatchRsp* hbRsp) {
int64_t reqId = hbRsp->reqId;
int64_t rspId = hbRsp->rspId;
SArray* rsps = hbRsp->rsps;
int32_t sz = taosArrayGetSize(rsps);
for (int i = 0; i < sz; i++) {
SClientHbRsp* pRsp = taosArrayGet(rsps, i);
if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) {
clientHbMgr.handle[pRsp->connKey.hbType](pRsp);
} else {
// discard rsp
}
}
return 0;
}
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) {
// init hash in activeinfo
void* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey));
if (data != NULL) {
return 0;
}
SClientHbReq hbReq;
hbReq.connKey = connKey;
hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
// init hash
if (func != NULL) {
taosHashPut(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo));
}
atomic_add_fetch_32(&clientHbMgr.connKeyCnt, 1);
return 0;
}
void hbDeregisterConn(SClientHbKey connKey) {
taosHashRemove(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey));
taosHashRemove(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey));
atomic_sub_fetch_32(&clientHbMgr.connKeyCnt, 1);
}
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
// find req by connection id
SClientHbReq* pReq = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(pReq != NULL);
taosHashPut(pReq->info, key, keyLen, value, valueLen);
return 0;
}
......@@ -19,59 +19,7 @@ static int32_t mqHbRspHandle(SClientHbRsp* pReq) {
return 0;
}
uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
return 0;
}
static void hbMgrInitMqHbFunc() {
clientHbMgr.handle[mq] = mqHbRspHandle;
}
int hbMgrInit() {
//init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0;
//init config
clientHbMgr.reportInterval = 1500;
//init stat
clientHbMgr.stats = 0;
//init lock
taosInitRWLatch(&clientHbMgr.lock);
//init handle funcs
hbMgrInitMqHbFunc();
//init hash info
clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
//init getInfoFunc
clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
return 0;
void hbMgrInitMqHbRspHandle() {
clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = mqHbRspHandle;
}
void hbMgrCleanUp() {
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
if (old == 0) return;
taosHashCleanup(clientHbMgr.activeInfo);
taosHashCleanup(clientHbMgr.getInfoFuncs);
}
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) {
return 0;
}
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
//lock
//find req by connection id
SClientHbReq* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(data != NULL);
taosHashPut(data->info, key, keyLen, value, valueLen);
//unlock
return 0;
}
......@@ -31,17 +31,15 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pReq->connKey);
void *pIter = NULL;
void *data;
SKlv klv;
data = taosHashIterate(pReq->info, pIter);
while (data != NULL) {
taosHashGetKey(data, &klv.key, (size_t *)&klv.keyLen);
klv.valueLen = taosHashGetDataLen(data);
klv.value = data;
void* pIter = taosHashIterate(pReq->info, pIter);
while (pIter != NULL) {
taosHashGetKey(pIter, &klv.key, (size_t *)&klv.keyLen);
klv.valueLen = taosHashGetDataLen(pIter);
klv.value = pIter;
taosEncodeSKlv(buf, &klv);
data = taosHashIterate(pReq->info, pIter);
pIter = taosHashIterate(pReq->info, pIter);
}
return tlen;
}
......
......@@ -232,7 +232,8 @@ static int32_t walCreateThread() {
if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) {
wError("failed to create wal thread since %s", strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pthread_attr_destroy(&thAttr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册