提交 5c73c1ff 编写于 作者: L Liu Jicong

refine heartbeat interface

上级 5d3f439a
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
typedef enum { typedef enum {
mq = 0, mq = 0,
// type can be added here
//
HEARTBEAT_TYPE_MAX HEARTBEAT_TYPE_MAX
} EHbType; } EHbType;
...@@ -30,6 +32,60 @@ typedef struct SKlv { ...@@ -30,6 +32,60 @@ typedef struct SKlv {
void* value; void* value;
} SKlv; } SKlv;
typedef struct SClientHbKey {
int32_t connId;
int32_t hbType;
} SClientHbKey;
typedef struct SClientHbReq {
SClientHbKey hbKey;
SHashObj* info; // hash<Slv.key, Sklv>
} SClientHbReq;
typedef struct SClientHbBatchReq {
int64_t reqId;
SArray* reqs; // SArray<SClientHbReq>
} SClientHbBatchReq;
typedef struct SClientHbHandleResult {
} SClientHbHandleResult;
typedef struct SClientHbRsp {
int32_t connId;
int32_t hbType;
} SClientHbRsp;
typedef struct SClientHbBatchRsp {
int64_t reqId;
int64_t rspId;
SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp;
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param);
typedef struct SClientHbMgr {
int8_t inited;
int32_t reportInterval; // unit ms
int32_t stats;
SRWLatch lock;
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
// input queue
} SClientHbMgr;
static SClientHbMgr clientHbMgr = {0};
int hbMgrInit();
void hbMgrCleanUp();
int hbHandleRsp(void* hbMsg);
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func);
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKlv->keyLen); tlen += taosEncodeFixedI32(buf, pKlv->keyLen);
...@@ -40,18 +96,13 @@ static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { ...@@ -40,18 +96,13 @@ static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) {
} }
static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) {
buf = taosDecodeFixedI32(buf, &pKlv->keyLen); buf = taosDecodeFixedI32(buf, &pKlv->keyLen);
buf = taosDecodeFixedI32(buf, &pKlv->valueLen); buf = taosDecodeFixedI32(buf, &pKlv->valueLen);
buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen);
buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen);
return buf; return buf;
} }
typedef struct SClientHbKey {
int32_t connId;
int32_t hbType;
} SClientHbKey;
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKey->connId); tlen += taosEncodeFixedI32(buf, pKey->connId);
...@@ -65,18 +116,13 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) ...@@ -65,18 +116,13 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey)
return buf; return buf;
} }
typedef struct SClientHbReq {
SClientHbKey hbKey;
SHashObj* info; // hash<Sklv>
} SClientHbReq;
static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) { static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pReq->hbKey); tlen += taosEncodeSClientHbKey(buf, &pReq->hbKey);
void* pIter = NULL; void* pIter = NULL;
void* data; void* data;
SKlv klv; SKlv klv;
data = taosHashIterate(pReq->info, pIter); data = taosHashIterate(pReq->info, pIter);
while (data != NULL) { while (data != NULL) {
taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen); taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen);
...@@ -93,56 +139,13 @@ static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) ...@@ -93,56 +139,13 @@ static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq)
ASSERT(pReq->info != NULL); ASSERT(pReq->info != NULL);
buf = taosDecodeSClientHbKey(buf, &pReq->hbKey); buf = taosDecodeSClientHbKey(buf, &pReq->hbKey);
//TODO: error handling // TODO: error handling
if(pReq->info == NULL) { if (pReq->info == NULL) {
pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
} }
SKlv klv; SKlv klv;
buf = taosDecodeSKlv(buf, &klv); buf = taosDecodeSKlv(buf, &klv);
taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen);
return buf; return buf;
} }
typedef struct SClientHbBatchReq {
int64_t reqId;
SArray* reqs; // SArray<SClientHbReq>
} SClientHbBatchReq;
typedef struct SClientHbHandleResult {
} SClientHbHandleResult;
typedef struct SClientHbRsp {
int32_t connId;
int32_t hbType;
} SClientHbRsp;
typedef struct SClientHbBatchRsp {
int64_t reqId;
int64_t rspId;
SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp;
typedef int32_t (*FHbRspHandle)(SClientHbReq* pReq);
typedef int32_t (*FGetConnInfo)(int32_t conn, void* self);
typedef struct SClientHbMgr {
int8_t inited;
int32_t reportInterval; // unit ms
int32_t stats;
SRWLatch lock;
SHashObj* info; //hash<SClientHbKey, SClientHbReq>
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
// input queue
} SClientHbMgr;
static SClientHbMgr clientHbMgr = {0};
int hbMgrInit();
void hbMgrCleanUp();
int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle);
int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle);
int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen);
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "clientHb.h" #include "clientHb.h"
static int32_t mqHbRspHandle(SClientHbReq* pReq) { static int32_t mqHbRspHandle(SClientHbRsp* pReq) {
return 0; return 0;
} }
...@@ -42,15 +42,12 @@ void hbMgrCleanUp() { ...@@ -42,15 +42,12 @@ void hbMgrCleanUp() {
} }
int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle) { int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) {
return 0;
}
int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle) {
return 0; return 0;
} }
int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen) { int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
//lock //lock
//find req by connection id //find req by connection id
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册