提交 29cb7c67 编写于 作者: L Liu Jicong

implement client heartbeat

上级 57d34170
......@@ -133,12 +133,12 @@ typedef enum _mgmt_table {
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
typedef struct SKlv {
typedef struct SKv {
int32_t keyLen;
int32_t valueLen;
void* key;
void* value;
} SKlv;
} SKv;
typedef struct SClientHbKey {
int32_t connId;
......@@ -174,26 +174,36 @@ static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq);
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
SClientHbReq* req = pReq;
SClientHbReq* req = (SClientHbReq*)pReq;
taosHashCleanup(req->info);
free(pReq);
}
static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) {
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) {
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
free(pReq);
}
static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKlv->keyLen);
tlen += taosEncodeFixedI32(buf, pKlv->valueLen);
tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen);
tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen);
tlen += taosEncodeFixedI32(buf, pKv->keyLen);
tlen += taosEncodeFixedI32(buf, pKv->valueLen);
tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen);
tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen);
return tlen;
}
static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) {
buf = taosDecodeFixedI32(buf, &pKlv->keyLen);
buf = taosDecodeFixedI32(buf, &pKlv->valueLen);
buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen);
buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen);
static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) {
buf = taosDecodeFixedI32(buf, &pKv->keyLen);
buf = taosDecodeFixedI32(buf, &pKv->valueLen);
buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen);
buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen);
return buf;
}
......
......@@ -23,6 +23,7 @@ extern "C" {
#include "query.h"
#include "tmsg.h"
#include "tarray.h"
#include "trpc.h"
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
......
......@@ -94,55 +94,6 @@ typedef struct STableMetaOutput {
STableMeta *tbMeta;
} STableMetaOutput;
typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr{
int32_t nodeId; //vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
*/
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
const SSchema* tGetTbnameColumnSchema();
void initQueryModuleMsgHandle();
......
......@@ -84,6 +84,55 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp)
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr{
int32_t nodeId; //vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
*/
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
#ifdef __cplusplus
}
#endif
......
......@@ -211,6 +211,24 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
/**
* Get the corresponding key information for a given data in hash table, using memcpy
* @param data
* @param dst
* @return
*/
static FORCE_INLINE int32_t taosHashCopyKey(void *data, void* dst) {
if (NULL == data || NULL == dst) {
return -1;
}
SHashNode * node = GET_HASH_PNODE(data);
void* key = GET_HASH_NODE_KEY(node);
memcpy(dst, key, node->keyLen);
return 0;
}
/**
* Get the corresponding data length for a given data in hash table
* @param data
......
......@@ -28,10 +28,13 @@ typedef enum {
} EHbType;
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param);
//TODO: embed param into function
//return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);
// called by mgmt
int hbMgrInit();
int hbMgrInit(void* transporter, SEpSet epSet);
void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
......
......@@ -14,6 +14,7 @@
*/
#include "clientHb.h"
#include "trpc.h"
typedef struct SClientHbMgr {
int8_t inited;
......@@ -23,8 +24,13 @@ typedef struct SClientHbMgr {
int32_t connKeyCnt;
int64_t reportBytes; // not implemented
int64_t startTime;
// thread
// ctl
int8_t threadStop;
pthread_t thread;
SRWLatch lock; // lock is used in serialization
// connection
void* transporter;
SEpSet epSet;
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
......@@ -36,6 +42,14 @@ static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread();
static void hbStopThread();
static int32_t hbMqHbRspHandle(SClientHbRsp* pReq) {
return 0;
}
void hbMgrInitMqHbRspHandle() {
clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
}
static FORCE_INLINE void hbMgrInitHandle() {
// init all handle
hbMgrInitMqHbRspHandle();
......@@ -50,11 +64,22 @@ static SClientHbBatchReq* hbGatherAllInfo() {
int32_t connKeyCnt = atomic_load_32(&clientHbMgr.connKeyCnt);
pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
void *pIter = taosHashIterate(clientHbMgr.activeInfo, pIter);
void *pIter = taosHashIterate(clientHbMgr.activeInfo, NULL);
while (pIter != NULL) {
taosArrayPush(pReq->reqs, pIter);
SClientHbReq* pOneReq = pIter;
taosHashClear(pOneReq->info);
pIter = taosHashIterate(clientHbMgr.activeInfo, pIter);
}
pIter = taosHashIterate(clientHbMgr.getInfoFuncs, NULL);
while (pIter != NULL) {
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
SClientHbKey connKey;
taosHashCopyKey(pIter, &connKey);
getConnInfoFp(connKey, NULL);
pIter = taosHashIterate(clientHbMgr.activeInfo, pIter);
}
......@@ -64,8 +89,23 @@ static SClientHbBatchReq* hbGatherAllInfo() {
static void* hbThreadFunc(void* param) {
setThreadName("hb");
while (1) {
int8_t threadStop = atomic_load_8(&clientHbMgr.threadStop);
if(threadStop) {
break;
}
SClientHbBatchReq* pReq = hbGatherAllInfo();
void* reqStr = NULL;
tSerializeSClientHbBatchReq(&reqStr, pReq);
SMsgSendInfo info;
int64_t transporterId = 0;
asyncSendMsgToServer(clientHbMgr.transporter, &clientHbMgr.epSet, &transporterId, &info);
tFreeClientHbBatchReq(pReq);
atomic_add_fetch_32(&clientHbMgr.reportCnt, 1);
taosMsleep(HEARTBEAT_INTERVAL);
}
return NULL;
......@@ -84,7 +124,11 @@ static int32_t hbCreateThread() {
return 0;
}
int hbMgrInit() {
static void hbStopThread() {
atomic_store_8(&clientHbMgr.threadStop, 1);
}
int hbMgrInit(void* transporter, SEpSet epSet) {
// init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0;
......@@ -101,7 +145,13 @@ int hbMgrInit() {
// init getInfoFunc
clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
//init connection info
clientHbMgr.transporter = transporter;
clientHbMgr.epSet = epSet;
// init backgroud thread
hbCreateThread();
return 0;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "clientHb.h"
static int32_t mqHbRspHandle(SClientHbRsp* pReq) {
return 0;
}
void hbMgrInitMqHbRspHandle() {
clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = mqHbRspHandle;
}
......@@ -31,13 +31,13 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pReq->connKey);
SKlv klv;
SKv kv;
void* pIter = taosHashIterate(pReq->info, pIter);
while (pIter != NULL) {
taosHashGetKey(pIter, &klv.key, (size_t *)&klv.keyLen);
klv.valueLen = taosHashGetDataLen(pIter);
klv.value = pIter;
taosEncodeSKlv(buf, &klv);
taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen);
kv.valueLen = taosHashGetDataLen(pIter);
kv.value = pIter;
taosEncodeSKv(buf, &kv);
pIter = taosHashIterate(pReq->info, pIter);
}
......@@ -52,13 +52,22 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) {
if (pReq->info == NULL) {
pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
SKlv klv;
buf = taosDecodeSKlv(buf, &klv);
taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen);
SKv kv;
buf = taosDecodeSKv(buf, &kv);
taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen);
return buf;
}
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq) {
int tlen = 0;
return tlen;
}
void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq) {
return buf;
}
int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int tlen = 0;
......
......@@ -15,5 +15,5 @@ TARGET_INCLUDE_DIRECTORIES(
TARGET_LINK_LIBRARIES(
queryUtilTest
PUBLIC os util gtest qcom common
PUBLIC os util gtest qcom common transport
)
......@@ -17,6 +17,7 @@
#include <iostream>
#include "tmsg.h"
#include "query.h"
#include "trpc.h"
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
......@@ -80,4 +81,4 @@ TEST(testCase, error_in_async_test) {
taosAsyncExec(testPrintError, p, &code);
usleep(1000);
printf("Error code:%d after asynchronously exec function\n", code);
}
\ No newline at end of file
}
......@@ -794,7 +794,9 @@ FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) {
SHashNode * node = GET_HASH_PNODE(data);
*key = GET_HASH_NODE_KEY(node);
*keyLen = node->keyLen;
if (keyLen) {
*keyLen = node->keyLen;
}
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册