提交 7583b45e 编写于 作者: L Liu Jicong

refine heartbeat interface

上级 11449f3d
...@@ -26,6 +26,7 @@ extern "C" { ...@@ -26,6 +26,7 @@ extern "C" {
#include "tarray.h" #include "tarray.h"
#include "tcoding.h" #include "tcoding.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "thash.h"
#include "tlist.h" #include "tlist.h"
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */ /* ------------------------ MESSAGE DEFINITIONS ------------------------ */
...@@ -132,6 +133,73 @@ typedef enum _mgmt_table { ...@@ -132,6 +133,73 @@ typedef enum _mgmt_table {
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
typedef struct SKlv {
int32_t keyLen;
int32_t valueLen;
void* key;
void* value;
} SKlv;
static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKlv->keyLen);
tlen += taosEncodeFixedI32(buf, pKlv->valueLen);
tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen);
tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen);
return tlen;
}
static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) {
buf = taosDecodeFixedI32(buf, &pKlv->keyLen);
buf = taosDecodeFixedI32(buf, &pKlv->valueLen);
buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen);
buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen);
return buf;
}
typedef struct SClientHbKey {
int32_t connId;
int32_t hbType;
} SClientHbKey;
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKey->connId);
tlen += taosEncodeFixedI32(buf, pKey->hbType);
return tlen;
}
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) {
buf = taosDecodeFixedI32(buf, &pKey->connId);
buf = taosDecodeFixedI32(buf, &pKey->hbType);
return buf;
}
typedef struct SClientHbReq {
SClientHbKey connKey;
SHashObj* info; // hash<Slv.key, Sklv>
} SClientHbReq;
typedef struct SClientHbBatchReq {
int64_t reqId;
SArray* reqs; // SArray<SClientHbReq>
} SClientHbBatchReq;
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq);
typedef struct SClientHbRsp {
SClientHbKey connKey;
int32_t status;
int32_t bodyLen;
void* body;
} SClientHbRsp;
typedef struct SClientHbBatchRsp {
int64_t reqId;
int64_t rspId;
SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp;
typedef struct SBuildTableMetaInput { typedef struct SBuildTableMetaInput {
int32_t vgId; int32_t vgId;
char* dbName; char* dbName;
...@@ -1126,7 +1194,7 @@ typedef struct { ...@@ -1126,7 +1194,7 @@ typedef struct {
int32_t topicNum; int32_t topicNum;
int64_t consumerId; int64_t consumerId;
char* consumerGroup; char* consumerGroup;
SArray* topicNames; // SArray<char*> SArray* topicNames; // SArray<char*>
} SCMSubscribeReq; } SCMSubscribeReq;
static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
...@@ -1135,7 +1203,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribe ...@@ -1135,7 +1203,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribe
tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->consumerGroup); tlen += taosEncodeString(buf, pReq->consumerGroup);
for(int i = 0; i < pReq->topicNum; i++) { for (int i = 0; i < pReq->topicNum; i++) {
tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i)); tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
} }
return tlen; return tlen;
...@@ -1146,7 +1214,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq ...@@ -1146,7 +1214,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeString(buf, &pReq->consumerGroup); buf = taosDecodeString(buf, &pReq->consumerGroup);
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*)); pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
for(int i = 0; i < pReq->topicNum; i++) { for (int i = 0; i < pReq->topicNum; i++) {
char* name = NULL; char* name = NULL;
buf = taosDecodeString(buf, &name); buf = taosDecodeString(buf, &name);
taosArrayPush(pReq->topicNames, &name); taosArrayPush(pReq->topicNames, &name);
...@@ -1161,14 +1229,14 @@ typedef struct SMqSubTopic { ...@@ -1161,14 +1229,14 @@ typedef struct SMqSubTopic {
} SMqSubTopic; } SMqSubTopic;
typedef struct { typedef struct {
int32_t topicNum; int32_t topicNum;
SMqSubTopic topics[]; SMqSubTopic topics[];
} SCMSubscribeRsp; } SCMSubscribeRsp;
static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedI32(buf, pRsp->topicNum); tlen += taosEncodeFixedI32(buf, pRsp->topicNum);
for(int i = 0; i < pRsp->topicNum; i++) { for (int i = 0; i < pRsp->topicNum; i++) {
tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId); tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId);
tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId); tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId);
tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet); tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet);
...@@ -1178,7 +1246,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribe ...@@ -1178,7 +1246,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribe
static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) {
buf = taosDecodeFixedI32(buf, &pRsp->topicNum); buf = taosDecodeFixedI32(buf, &pRsp->topicNum);
for(int i = 0; i < pRsp->topicNum; i++) { for (int i = 0; i < pRsp->topicNum; i++) {
buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId); buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId);
buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId); buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId);
buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet); buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet);
......
...@@ -25,44 +25,6 @@ typedef enum { ...@@ -25,44 +25,6 @@ typedef enum {
HEARTBEAT_TYPE_MAX HEARTBEAT_TYPE_MAX
} EHbType; } EHbType;
typedef struct SKlv {
int32_t keyLen;
int32_t valueLen;
void* key;
void* value;
} SKlv;
typedef struct SClientHbKey {
int32_t connId;
int32_t hbType;
} SClientHbKey;
typedef struct SClientHbReq {
SClientHbKey connKey;
SHashObj* info; // hash<Slv.key, Sklv>
} SClientHbReq;
typedef struct SClientHbBatchReq {
int64_t reqId;
SArray* reqs; // SArray<SClientHbReq>
} SClientHbBatchReq;
typedef struct SClientHbHandleResult {
} SClientHbHandleResult;
typedef struct SClientHbRsp {
SClientHbKey connKey;
int32_t status;
int32_t bodyLen;
void* body;
} SClientHbRsp;
typedef struct SClientHbBatchRsp {
int64_t reqId;
int64_t rspId;
SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp;
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param); typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param);
...@@ -81,73 +43,11 @@ static SClientHbMgr clientHbMgr = {0}; ...@@ -81,73 +43,11 @@ static SClientHbMgr clientHbMgr = {0};
int hbMgrInit(); int hbMgrInit();
void hbMgrCleanUp(); void hbMgrCleanUp();
int hbHandleRsp(void* hbMsg); int hbHandleRsp(void* hbMsg);
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func);
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func);
static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKlv->keyLen);
tlen += taosEncodeFixedI32(buf, pKlv->valueLen);
tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen);
tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen);
return tlen;
}
static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) {
buf = taosDecodeFixedI32(buf, &pKlv->keyLen);
buf = taosDecodeFixedI32(buf, &pKlv->valueLen);
buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen);
buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen);
return buf;
}
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKey->connId);
tlen += taosEncodeFixedI32(buf, pKey->hbType);
return tlen;
}
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) {
buf = taosDecodeFixedI32(buf, &pKey->connId);
buf = taosDecodeFixedI32(buf, &pKey->hbType);
return buf;
}
static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) {
int tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pReq->connKey);
void* pIter = NULL;
void* data;
SKlv klv;
data = taosHashIterate(pReq->info, pIter);
while (data != NULL) {
taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen);
klv.valueLen = taosHashGetDataLen(data);
klv.value = data;
taosEncodeSKlv(buf, &klv);
data = taosHashIterate(pReq->info, pIter);
}
return tlen;
}
static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) {
ASSERT(pReq->info != NULL);
buf = taosDecodeSClientHbKey(buf, &pReq->connKey);
// TODO: error handling int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
if (pReq->info == NULL) {
pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
SKlv klv;
buf = taosDecodeSKlv(buf, &klv);
taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen);
return buf;
}
...@@ -19,27 +19,44 @@ static int32_t mqHbRspHandle(SClientHbRsp* pReq) { ...@@ -19,27 +19,44 @@ static int32_t mqHbRspHandle(SClientHbRsp* pReq) {
return 0; return 0;
} }
uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
return 0;
}
static void hbMgrInitMqHbFunc() {
clientHbMgr.handle[mq] = mqHbRspHandle;
}
int hbMgrInit() { int hbMgrInit() {
//init once //init once
// int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
//init lock if (old == 1) return 0;
//
//init handle funcs //init config
clientHbMgr.handle[mq] = mqHbRspHandle; clientHbMgr.reportInterval = 1500;
//init stat //init stat
clientHbMgr.stats = 0; clientHbMgr.stats = 0;
//init config //init lock
clientHbMgr.reportInterval = 1500; taosInitRWLatch(&clientHbMgr.lock);
//init handle funcs
hbMgrInitMqHbFunc();
//init hash info //init hash info
// clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
//init getInfoFunc
clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
return 0; return 0;
} }
void hbMgrCleanUp() { void hbMgrCleanUp() {
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
if (old == 0) return;
taosHashCleanup(clientHbMgr.activeInfo);
taosHashCleanup(clientHbMgr.getInfoFuncs);
} }
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) {
...@@ -51,6 +68,9 @@ int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, ...@@ -51,6 +68,9 @@ int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen,
//lock //lock
//find req by connection id //find req by connection id
SClientHbReq* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(data != NULL);
taosHashPut(data->info, key, keyLen, value, valueLen);
//unlock //unlock
return 0; return 0;
......
...@@ -27,6 +27,40 @@ ...@@ -27,6 +27,40 @@
#undef TD_MSG_SEG_CODE_ #undef TD_MSG_SEG_CODE_
#include "tmsgdef.h" #include "tmsgdef.h"
int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pReq->connKey);
void *pIter = NULL;
void *data;
SKlv klv;
data = taosHashIterate(pReq->info, pIter);
while (data != NULL) {
taosHashGetKey(data, &klv.key, (size_t *)&klv.keyLen);
klv.valueLen = taosHashGetDataLen(data);
klv.value = data;
taosEncodeSKlv(buf, &klv);
data = taosHashIterate(pReq->info, pIter);
}
return tlen;
}
void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) {
ASSERT(pReq->info != NULL);
buf = taosDecodeSClientHbKey(buf, &pReq->connKey);
// TODO: error handling
if (pReq->info == NULL) {
pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
SKlv klv;
buf = taosDecodeSKlv(buf, &klv);
taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen);
return buf;
}
int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int tlen = 0; int tlen = 0;
...@@ -148,4 +182,4 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) { ...@@ -148,4 +182,4 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
} }
return buf; return buf;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册