提交 3b69bde0 编写于 作者: L Liu Jicong

add mq defination

上级 05372238
...@@ -193,8 +193,7 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); ...@@ -193,8 +193,7 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
DLL_EXPORT TAOS_RES* tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -68,6 +68,14 @@ typedef uint16_t tmsg_t; ...@@ -68,6 +68,14 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7 #define TSDB_IE_TYPE_DNODE_STATE 7
typedef enum {
HEARTBEAT_TYPE_MQ = 0,
HEARTBEAT_TYPE_QUERY = 1,
// types can be added here
//
HEARTBEAT_TYPE_MAX
} EHbType;
typedef enum _mgmt_table { typedef enum _mgmt_table {
TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_START,
TSDB_MGMT_TABLE_ACCT, TSDB_MGMT_TABLE_ACCT,
...@@ -220,6 +228,7 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) ...@@ -220,6 +228,7 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey)
return buf; return buf;
} }
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
char* dbName; char* dbName;
...@@ -359,6 +368,31 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { ...@@ -359,6 +368,31 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
return buf; return buf;
} }
typedef struct SMqHbRsp {
int8_t status; //idle or not
int8_t vnodeChanged;
int8_t epChanged; // should use new epset
int8_t reserved;
SEpSet epSet;
} SMqHbRsp;
static FORCE_INLINE int taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) {
int tlen = 0;
tlen += taosEncodeFixedI8(buf, pRsp->status);
tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged);
tlen += taosEncodeFixedI8(buf, pRsp->epChanged);
tlen += taosEncodeSEpSet(buf, &pRsp->epSet);
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
buf = taosDecodeFixedI8(buf, &pRsp->status);
buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged);
buf = taosDecodeFixedI8(buf, &pRsp->epChanged);
buf = taosDecodeSEpSet(buf, &pRsp->epSet);
return buf;
}
typedef struct { typedef struct {
int32_t acctId; int32_t acctId;
int64_t clusterId; int64_t clusterId;
......
...@@ -129,7 +129,7 @@ enum { ...@@ -129,7 +129,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "mnode-kill-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "mnode-kill-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "mnode-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "mnode-heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tarray.h"
#include "thash.h"
#include "tmsg.h"
#define HEARTBEAT_INTERVAL 1500 // ms
typedef enum {
HEARTBEAT_TYPE_MQ = 0,
// types can be added here
//
HEARTBEAT_TYPE_MAX
} EHbType;
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef struct SAppHbMgr {
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
int64_t reportBytes; // not implemented
int64_t startTime;
// ctl
SRWLatch lock; // lock is used in serialization
// connection
void* transporter;
SEpSet epSet;
// info
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
} SAppHbMgr;
typedef struct SClientHbMgr {
int8_t inited;
// ctl
int8_t threadStop;
pthread_t thread;
pthread_mutex_t lock; // used when app init and cleanup
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;
// TODO: embed param into function
// return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);
// global, called by mgmt
int hbMgrInit();
void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
// mq
void hbMgrInitMqHbRspHandle();
...@@ -31,6 +31,41 @@ extern "C" { ...@@ -31,6 +31,41 @@ extern "C" {
#include "trpc.h" #include "trpc.h"
#include "query.h" #include "query.h"
#define HEARTBEAT_INTERVAL 1500 // ms
typedef struct SAppInstInfo SAppInstInfo;
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef struct SAppHbMgr {
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
int64_t reportBytes; // not implemented
int64_t startTime;
// ctl
SRWLatch lock; // lock is used in serialization
// connection
SAppInstInfo* pAppInstInfo;
// info
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
} SAppHbMgr;
typedef struct SClientHbMgr {
int8_t inited;
// ctl
int8_t threadStop;
pthread_t thread;
pthread_mutex_t lock; // used when app init and cleanup
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;
// TODO: embed param into function
// return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
int64_t start; // start timestamp int64_t start; // start timestamp
int64_t parsed; // start to parse int64_t parsed; // start to parse
...@@ -55,15 +90,15 @@ typedef struct SHeartBeatInfo { ...@@ -55,15 +90,15 @@ typedef struct SHeartBeatInfo {
void *pTimer; // timer, used to send request msg to mnode void *pTimer; // timer, used to send request msg to mnode
} SHeartBeatInfo; } SHeartBeatInfo;
typedef struct SAppInstInfo { struct SAppInstInfo {
int64_t numOfConns; int64_t numOfConns;
SCorEpSet mgmtEp; SCorEpSet mgmtEp;
SInstanceSummary summary; SInstanceSummary summary;
SList *pConnList; // STscObj linked list SList *pConnList; // STscObj linked list
int64_t clusterId; int64_t clusterId;
void *pTransporter; void *pTransporter;
SHeartBeatInfo hb; struct SAppHbMgr *pAppHbMgr;
} SAppInstInfo; };
typedef struct SAppInfo { typedef struct SAppInfo {
int64_t startTime; int64_t startTime;
...@@ -81,6 +116,7 @@ typedef struct STscObj { ...@@ -81,6 +116,7 @@ typedef struct STscObj {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int32_t acctId; int32_t acctId;
uint32_t connId; uint32_t connId;
int32_t connType;
uint64_t id; // ref ID returned by taosAddRef uint64_t id; // ref ID returned by taosAddRef
void *pTransporter; void *pTransporter;
pthread_mutex_t mutex; // used to protect the operation on db pthread_mutex_t mutex; // used to protect the operation on db
...@@ -88,6 +124,10 @@ typedef struct STscObj { ...@@ -88,6 +124,10 @@ typedef struct STscObj {
SAppInstInfo *pAppInfo; SAppInstInfo *pAppInfo;
} STscObj; } STscObj;
typedef struct SMqConsumer {
STscObj* pTscObj;
} SMqConsumer;
typedef struct SReqResultInfo { typedef struct SReqResultInfo {
const char *pRspMsg; const char *pRspMsg;
const char *pData; const char *pData;
...@@ -169,6 +209,26 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); ...@@ -169,6 +209,26 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
void *doFetchRow(SRequestObj* pRequest); void *doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
// --- heartbeat
// global, called by mgmt
int hbMgrInit();
void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
// --- mq
void hbMgrInitMqHbRspHandle();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "clientInt.h"
#include "clientHb.h" #include "clientHb.h"
#include "trpc.h" #include "trpc.h"
...@@ -21,10 +22,18 @@ static SClientHbMgr clientHbMgr = {0}; ...@@ -21,10 +22,18 @@ static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread(); static int32_t hbCreateThread();
static void hbStopThread(); static void hbStopThread();
static int32_t hbMqHbRspHandle(SClientHbRsp* pReq) { static int32_t hbMqHbRspHandle(SClientHbRsp* pRsp) {
return 0; return 0;
} }
static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) {
if (code != 0) {
return -1;
}
SClientHbRsp* pRsp = (SClientHbRsp*) pMsg->pData;
return hbMqHbRspHandle(pRsp);
}
void hbMgrInitMqHbRspHandle() { void hbMgrInitMqHbRspHandle() {
clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
} }
...@@ -77,18 +86,31 @@ static void* hbThreadFunc(void* param) { ...@@ -77,18 +86,31 @@ static void* hbThreadFunc(void* param) {
for(int i = 0; i < sz; i++) { for(int i = 0; i < sz; i++) {
SAppHbMgr* pAppHbMgr = taosArrayGet(clientHbMgr.appHbMgrs, i); SAppHbMgr* pAppHbMgr = taosArrayGet(clientHbMgr.appHbMgrs, i);
SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr); SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr);
void* reqStr = NULL; int tlen = tSerializeSClientHbBatchReq(NULL, pReq);
int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq); void *buf = malloc(tlen);
if (buf == NULL) {
//TODO: error handling
break;
}
tSerializeSClientHbBatchReq(buf, pReq);
SMsgSendInfo info; SMsgSendInfo info;
/*info.fp = hbHandleRsp;*/ info.fp = hbMqAsyncCallBack;
info.msgInfo.pData = buf;
info.msgInfo.len = tlen;
info.msgType = TDMT_MND_HEARTBEAT;
info.param = NULL;
info.requestId = generateRequestId();
info.requestObjRefId = -1;
SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pAppHbMgr->transporter, &pAppHbMgr->epSet, &transporterId, &info); SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, &info);
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
taosMsleep(HEARTBEAT_INTERVAL);
} }
taosMsleep(HEARTBEAT_INTERVAL);
} }
return NULL; return NULL;
} }
...@@ -110,7 +132,8 @@ static void hbStopThread() { ...@@ -110,7 +132,8 @@ static void hbStopThread() {
atomic_store_8(&clientHbMgr.threadStop, 1); atomic_store_8(&clientHbMgr.threadStop, 1);
} }
SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) { SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
hbMgrInit();
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) { if (pAppHbMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -119,9 +142,8 @@ SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) { ...@@ -119,9 +142,8 @@ SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) {
// init stat // init stat
pAppHbMgr->startTime = taosGetTimestampMs(); pAppHbMgr->startTime = taosGetTimestampMs();
// init connection info // init app info
pAppHbMgr->transporter = transporter; pAppHbMgr->pAppInstInfo = pAppInstInfo;
pAppHbMgr->epSet = epSet;
// init hash info // init hash info
pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
...@@ -171,7 +193,6 @@ void hbMgrCleanUp() { ...@@ -171,7 +193,6 @@ void hbMgrCleanUp() {
if (old == 0) return; if (old == 0) return;
taosArrayDestroy(clientHbMgr.appHbMgrs); taosArrayDestroy(clientHbMgr.appHbMgrs);
} }
int hbHandleRsp(SClientHbBatchRsp* hbRsp) { int hbHandleRsp(SClientHbBatchRsp* hbRsp) {
......
...@@ -113,6 +113,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -113,6 +113,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet; p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p; pInst = &p;
...@@ -220,6 +221,101 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { ...@@ -220,6 +221,101 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL /*todo appInfo.xxx*/, pDag, pJob); return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL /*todo appInfo.xxx*/, pDag, pJob);
} }
typedef struct tmq_t tmq_t;
typedef struct SMqClientTopic {
// subscribe info
int32_t sqlLen;
char* sql;
char* topicName;
int64_t topicId;
// statistics
int64_t consumeCnt;
// offset
int64_t committedOffset;
int64_t currentOffset;
//connection info
int32_t vgId;
SEpSet epSet;
} SMqClientTopic;
typedef struct tmq_resp_err_t {
int32_t code;
} tmq_resp_err_t;
typedef struct tmq_topic_vgroup_list_t {
char* topicName;
int32_t vgId;
int64_t committedOffset;
} tmq_topic_vgroup_list_t;
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
typedef struct tmq_conf_t{
char* clientId;
char* groupId;
char* ip;
uint16_t port;
tmq_commit_cb* commit_cb;
} tmq_conf_t;
struct tmq_t {
char groupId[256];
char clientId[256];
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
SArray* clientTopics; // SArray<SMqClientTopic>
};
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
conf->commit_cb = cb;
}
SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
tmq_t* pTmq = (void*)param;
SArray* pArray = taosArrayInit(0, sizeof(SKv));
if (pArray == NULL) {
return NULL;
}
SKv kv = {0};
kv.key = malloc(256);
if (kv.key == NULL) {
taosArrayDestroy(pArray);
return NULL;
}
strcpy(kv.key, "groupId");
kv.keyLen = strlen("groupId") + 1;
kv.value = malloc(256);
if (kv.value == NULL) {
free(kv.key);
taosArrayDestroy(pArray);
return NULL;
}
strcpy(kv.value, pTmq->groupId);
kv.valueLen = strlen(pTmq->groupId) + 1;
taosArrayPush(pArray, &kv);
strcpy(kv.key, "clientUid");
kv.keyLen = strlen("clientUid") + 1;
*(uint32_t*)kv.value = pTmq->pTscObj->connId;
kv.valueLen = sizeof(uint32_t);
return NULL;
}
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
tmq_t* pTmq = malloc(sizeof(tmq_t));
if (pTmq == NULL) {
return NULL;
}
strcpy(pTmq->groupId, conf->groupId);
strcpy(pTmq->clientId, conf->clientId);
pTmq->pTscObj = (STscObj*)conn;
pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;
return pTmq;
}
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) { TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
STscObj* pTscObj = (STscObj*)taos; STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
...@@ -281,6 +377,25 @@ _return: ...@@ -281,6 +377,25 @@ _return:
return pRequest; return pRequest;
} }
typedef struct tmq_message_t {
int32_t numOfRows;
char* topicName;
TAOS_ROW row[];
} tmq_message_t;
tmq_message_t* tmq_consume_poll(tmq_t* mq, int64_t blocking_time) {
return NULL;
}
tmq_resp_err_t* tmq_commit(tmq_t* mq, void* callback, int32_t async) {
return NULL;
}
void tmq_message_destroy(tmq_message_t* mq_message) {
}
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
STscObj *pTscObj = (STscObj *)taos; STscObj *pTscObj = (STscObj *)taos;
if (sqlLen > (size_t) tsMaxSQLStringLen) { if (sqlLen > (size_t) tsMaxSQLStringLen) {
......
...@@ -71,6 +71,9 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -71,6 +71,9 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->pAppInfo->clusterId = pConnect->clusterId; pTscObj->pAppInfo->clusterId = pConnect->clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);
// pRequest->body.resInfo.pRspMsg = pMsg->pData; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
pTscObj->pAppInfo->numOfConns); pTscObj->pAppInfo->numOfConns);
...@@ -382,4 +385,4 @@ void initMsgHandleFp() { ...@@ -382,4 +385,4 @@ void initMsgHandleFp() {
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp; handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp; handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp;
} }
\ No newline at end of file
...@@ -258,6 +258,23 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { ...@@ -258,6 +258,23 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
} }
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
char *batchReqStr = pReq->rpcMsg.pCont;
SClientHbBatchReq batchReq = {0};
tDeserializeClientHbBatchReq(batchReqStr, &batchReq);
SArray *pArray = batchReq.reqs;
int sz = taosArrayGetSize(pArray);
for (int i = 0; i < sz; i++) {
SClientHbReq* pReq = taosArrayGet(pArray, i);
if (pReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
} else if (pReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
}
}
return 0;
#if 0
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
...@@ -327,6 +344,7 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { ...@@ -327,6 +344,7 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
pReq->contLen = sizeof(SConnectRsp); pReq->contLen = sizeof(SConnectRsp);
pReq->pCont = pRsp; pReq->pCont = pRsp;
return 0; return 0;
#endif
} }
static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) { static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) {
......
...@@ -199,8 +199,10 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -199,8 +199,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
} }
int walCheckAndRepairIdx(SWal* pWal) { int walCheckAndRepairIdx(SWal* pWal) {
// iterate all idx files // TODO: iterate all log files
// check first and last entry of each idx file valid // if idx not found, scan log and write idx
// if found, check complete by first and last entry of each idx file
// if idx incomplete, binary search last valid entry, and then build other part
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册