提交 1e17d776 编写于 作者: L Liu Jicong

fix crash

上级 4d635478
......@@ -18,7 +18,7 @@
#include "thash.h"
#include "tmsg.h"
#define HEARTBEAT_INTERVAL 1500 //ms
#define HEARTBEAT_INTERVAL 1500 // ms
typedef enum {
HEARTBEAT_TYPE_MQ = 0,
......@@ -29,20 +29,50 @@ typedef enum {
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
//TODO: embed param into function
//return type: SArray<Skv>
typedef struct SAppHbMgr {
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
int64_t reportBytes; // not implemented
int64_t startTime;
// ctl
SRWLatch lock; // lock is used in serialization
// connection
void* transporter;
SEpSet epSet;
// info
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
} SAppHbMgr;
typedef struct SClientHbMgr {
int8_t inited;
// ctl
int8_t threadStop;
pthread_t thread;
pthread_mutex_t lock; // used when app init and cleanup
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;
// TODO: embed param into function
// return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);
// called by mgmt
int hbMgrInit(void* transporter, SEpSet epSet);
// global, called by mgmt
int hbMgrInit();
void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
//called by user
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func);
void hbDeregisterConn(SClientHbKey connKey);
// cluster level
SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
// mq
void hbMgrInitMqHbRspHandle();
......@@ -16,27 +16,6 @@
#include "clientHb.h"
#include "trpc.h"
typedef struct SClientHbMgr {
int8_t inited;
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
int64_t reportBytes; // not implemented
int64_t startTime;
// ctl
int8_t threadStop;
pthread_t thread;
SRWLatch lock; // lock is used in serialization
// connection
void* transporter;
SEpSet epSet;
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;
static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread();
......@@ -55,32 +34,32 @@ static FORCE_INLINE void hbMgrInitHandle() {
hbMgrInitMqHbRspHandle();
}
static SClientHbBatchReq* hbGatherAllInfo() {
SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq* pReq = malloc(sizeof(SClientHbBatchReq));
if(pReq == NULL) {
if (pReq == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
int32_t connKeyCnt = atomic_load_32(&clientHbMgr.connKeyCnt);
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
void *pIter = taosHashIterate(clientHbMgr.activeInfo, NULL);
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) {
taosArrayPush(pReq->reqs, pIter);
SClientHbReq* pOneReq = pIter;
taosHashClear(pOneReq->info);
pIter = taosHashIterate(clientHbMgr.activeInfo, pIter);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
pIter = taosHashIterate(clientHbMgr.getInfoFuncs, NULL);
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
while (pIter != NULL) {
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
SClientHbKey connKey;
taosHashCopyKey(pIter, &connKey);
getConnInfoFp(connKey, NULL);
pIter = taosHashIterate(clientHbMgr.activeInfo, pIter);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
return pReq;
......@@ -93,22 +72,24 @@ static void* hbThreadFunc(void* param) {
if(threadStop) {
break;
}
SClientHbBatchReq* pReq = hbGatherAllInfo();
void* reqStr = NULL;
int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq);
SMsgSendInfo info;
/*info.fp = hbHandleRsp;*/
int64_t transporterId = 0;
asyncSendMsgToServer(clientHbMgr.transporter, &clientHbMgr.epSet, &transporterId, &info);
tFreeClientHbBatchReq(pReq);
atomic_add_fetch_32(&clientHbMgr.reportCnt, 1);
taosMsleep(HEARTBEAT_INTERVAL);
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for(int i = 0; i < sz; i++) {
SAppHbMgr* pAppHbMgr = taosArrayGet(clientHbMgr.appHbMgrs, i);
SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr);
void* reqStr = NULL;
int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq);
SMsgSendInfo info;
/*info.fp = hbHandleRsp;*/
int64_t transporterId = 0;
asyncSendMsgToServer(pAppHbMgr->transporter, &pAppHbMgr->epSet, &transporterId, &info);
tFreeClientHbBatchReq(pReq);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
taosMsleep(HEARTBEAT_INTERVAL);
}
}
return NULL;
}
......@@ -129,27 +110,55 @@ static void hbStopThread() {
atomic_store_8(&clientHbMgr.threadStop, 1);
}
int hbMgrInit(void* transporter, SEpSet epSet) {
SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) {
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// init stat
pAppHbMgr->startTime = taosGetTimestampMs();
// init connection info
pAppHbMgr->transporter = transporter;
pAppHbMgr->epSet = epSet;
// init hash info
pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq;
// init getInfoFunc
pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
return pAppHbMgr;
}
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) {
pthread_mutex_lock(&clientHbMgr.lock);
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int i = 0; i < sz; i++) {
SAppHbMgr* pTarget = taosArrayGet(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == pTarget) {
taosHashCleanup(pTarget->activeInfo);
taosHashCleanup(pTarget->getInfoFuncs);
}
}
pthread_mutex_unlock(&clientHbMgr.lock);
}
int hbMgrInit() {
// init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0;
// init stat
clientHbMgr.startTime = taosGetTimestampMs();
clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*));
pthread_mutex_init(&clientHbMgr.lock, NULL);
// init handle funcs
hbMgrInitHandle();
// init hash info
clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
clientHbMgr.activeInfo->freeFp = tFreeClientHbReq;
// init getInfoFunc
clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
//init connection info
clientHbMgr.transporter = transporter;
clientHbMgr.epSet = epSet;
// init backgroud thread
hbCreateThread();
......@@ -157,11 +166,12 @@ int hbMgrInit(void* transporter, SEpSet epSet) {
}
void hbMgrCleanUp() {
// destroy all appHbMgr
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
if (old == 0) return;
taosHashCleanup(clientHbMgr.activeInfo);
taosHashCleanup(clientHbMgr.getInfoFuncs);
taosArrayDestroy(clientHbMgr.appHbMgrs);
}
int hbHandleRsp(SClientHbBatchRsp* hbRsp) {
......@@ -181,34 +191,34 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp) {
return 0;
}
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) {
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) {
// init hash in activeinfo
void* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey));
void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (data != NULL) {
return 0;
}
SClientHbReq hbReq;
hbReq.connKey = connKey;
hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
// init hash
if (func != NULL) {
taosHashPut(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo));
taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo));
}
atomic_add_fetch_32(&clientHbMgr.connKeyCnt, 1);
atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
return 0;
}
void hbDeregisterConn(SClientHbKey connKey) {
taosHashRemove(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey));
taosHashRemove(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey));
atomic_sub_fetch_32(&clientHbMgr.connKeyCnt, 1);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey));
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
}
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
// find req by connection id
SClientHbReq* pReq = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey));
SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(pReq != NULL);
taosHashPut(pReq->info, key, keyLen, value, valueLen);
......
......@@ -31,7 +31,9 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pReq->connKey);
SKv kv;
int kvNum = taosHashGetSize(pReq->info);
tlen += taosEncodeFixedI32(buf, kvNum);
SKv kv;
void* pIter = taosHashIterate(pReq->info, pIter);
while (pIter != NULL) {
taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen);
......@@ -49,12 +51,14 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) {
buf = taosDecodeSClientHbKey(buf, &pReq->connKey);
// TODO: error handling
if (pReq->info == NULL) {
pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
int kvNum;
taosDecodeFixedI32(buf, &kvNum);
pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
for(int i = 0; i < kvNum; i++) {
SKv kv;
buf = taosDecodeSKv(buf, &kv);
taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen);
}
SKv kv;
buf = taosDecodeSKv(buf, &kv);
taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen);
return buf;
}
......
......@@ -95,11 +95,11 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
if (sdbWriteFile(pSdb) != 0) {
goto WAL_RESTORE_OVER;
}
}
if (walEndSnapshot(pWal) < 0) {
goto WAL_RESTORE_OVER;
}
}
code = 0;
......@@ -181,4 +181,4 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
return pMgmt->state == TAOS_SYNC_STATE_LEADER;
}
\ No newline at end of file
}
......@@ -131,6 +131,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
// seek section
int walChangeFile(SWal* pWal, int64_t ver);
int walChangeFileToLast(SWal* pWal);
// seek section end
int64_t walGetSeq();
......
......@@ -184,6 +184,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
}
taosArraySetSize(pArray, sz);
pWal->fileInfoSet = pArray;
pWal->writeCur = sz - 1;
cJSON_Delete(pRoot);
return 0;
}
......
......@@ -254,6 +254,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset};
int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno);
// TODO truncate
return -1;
}
......@@ -287,7 +288,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
}
/*if (!tfValid(pWal->writeLogTfd)) return -1;*/
ASSERT(pWal->writeCur >= 0);
pthread_mutex_lock(&pWal->mutex);
if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) {
walChangeFileToLast(pWal);
}
pWal->writeHead.head.version = index;
int64_t offset = walGetCurFileOffset(pWal);
......@@ -309,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
}
code = walWriteIndex(pWal, index, offset);
if (code != 0) {
// TODO
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册