提交 575f6419 编写于 作者: L Liu Jicong

enh(wal): skip read for specific msg

上级 3d972f81
......@@ -1969,7 +1969,6 @@ typedef struct {
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
int8_t withTagSchema;
char* qmsg;
} SMqRebVgReq;
......@@ -1984,7 +1983,6 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
tlen += taosEncodeFixedI8(buf, pReq->withTbName);
tlen += taosEncodeFixedI8(buf, pReq->withSchema);
tlen += taosEncodeFixedI8(buf, pReq->withTag);
tlen += taosEncodeFixedI8(buf, pReq->withTagSchema);
if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
tlen += taosEncodeString(buf, pReq->qmsg);
}
......@@ -2001,7 +1999,6 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
buf = taosDecodeFixedI8(buf, &pReq->withTbName);
buf = taosDecodeFixedI8(buf, &pReq->withSchema);
buf = taosDecodeFixedI8(buf, &pReq->withTag);
buf = taosDecodeFixedI8(buf, &pReq->withTagSchema);
if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
buf = taosDecodeString(buf, &pReq->qmsg);
}
......@@ -2590,7 +2587,6 @@ typedef struct {
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
int8_t withTagSchema;
SArray* blockDataLen; // SArray<int32_t>
SArray* blockData; // SArray<SRetrieveTableRsp*>
SArray* blockTbName; // SArray<char*>
......@@ -2609,7 +2605,6 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
tlen += taosEncodeFixedI8(buf, pRsp->withTbName);
tlen += taosEncodeFixedI8(buf, pRsp->withSchema);
tlen += taosEncodeFixedI8(buf, pRsp->withTag);
tlen += taosEncodeFixedI8(buf, pRsp->withTagSchema);
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i);
......@@ -2632,7 +2627,6 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
buf = taosDecodeFixedI8(buf, &pRsp->withTag);
buf = taosDecodeFixedI8(buf, &pRsp->withTagSchema);
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = 0;
......
......@@ -192,7 +192,13 @@ int32_t walEndSnapshot(SWal *);
SWalReadHandle *walOpenReadHandle(SWal *);
void walCloseReadHandle(SWalReadHandle *);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead);
// only for tq usage
// int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead);
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity);
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead);
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead);
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead);
// deprecated
#if 0
......
......@@ -382,12 +382,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) return NULL;
/*pTmq->inWaiting = 0;*/
pTmq->status = 0;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
/*pTmq->waitingRequest = 0;*/
/*pTmq->readyRequest = 0;*/
pTmq->epStatus = 0;
pTmq->epSkipCnt = 0;
// set conf
......
......@@ -126,7 +126,6 @@ int32_t tDecodeSQueryNodeAddr(SCoder *pDecoder, SQueryNodeAddr *pAddr) {
return 0;
}
int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse);
......@@ -2747,11 +2746,11 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subscribeDbName) < 0) return -1;
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
if (0 == astLen && tEncodeCStr(&encoder, pReq->subscribeDbName) < 0) return -1;
tEndEncode(&encoder);
......@@ -2773,6 +2772,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->subscribeDbName) < 0) return -1;
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
......@@ -2787,7 +2787,6 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (pReq->ast == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
} else {
if (tDecodeCStrTo(&decoder, pReq->subscribeDbName) < 0) return -1;
}
tEndDecode(&decoder);
......
......@@ -449,7 +449,6 @@ typedef struct {
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
int8_t withTagSchema;
SRWLatch lock;
int32_t sqlLen;
int32_t astLen;
......@@ -516,7 +515,6 @@ typedef struct {
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
int8_t withTagSchema;
SHashObj* consumerHash; // consumerId -> SMqConsumerEpInSub
// TODO put -1 into unassignVgs
// SArray* unassignedVgs;
......
......@@ -237,7 +237,6 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
pSubNew->withTbName = pSub->withTbName;
pSubNew->withSchema = pSub->withSchema;
pSubNew->withTag = pSub->withTag;
pSubNew->withTagSchema = pSub->withTagSchema;
pSubNew->vgNum = pSub->vgNum;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
......@@ -270,7 +269,6 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += taosEncodeFixedI8(buf, pSub->withTbName);
tlen += taosEncodeFixedI8(buf, pSub->withSchema);
tlen += taosEncodeFixedI8(buf, pSub->withTag);
tlen += taosEncodeFixedI8(buf, pSub->withTagSchema);
void *pIter = NULL;
int32_t sz = taosHashGetSize(pSub->consumerHash);
......@@ -297,7 +295,6 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
buf = taosDecodeFixedI8(buf, &pSub->withTbName);
buf = taosDecodeFixedI8(buf, &pSub->withSchema);
buf = taosDecodeFixedI8(buf, &pSub->withTag);
buf = taosDecodeFixedI8(buf, &pSub->withTagSchema);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
......
......@@ -35,11 +35,6 @@
#define MND_SUBSCRIBE_REBALANCE_CNT 3
enum {
MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
MQ_SUBSCRIBE_STATUS__DELETED,
};
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
......@@ -89,7 +84,6 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
pSub->withTbName = pTopic->withTbName;
pSub->withSchema = pTopic->withSchema;
pSub->withTag = pTopic->withTag;
pSub->withTagSchema = pTopic->withTagSchema;
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
......@@ -115,7 +109,6 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
req.withTbName = pSub->withTbName;
req.withSchema = pSub->withSchema;
req.withTag = pSub->withTag;
req.withTagSchema = pSub->withTagSchema;
strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
......@@ -514,9 +507,11 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
// TODO replace assert with error check
ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
// if add more consumer to balanced subscribe,
// possibly no vg is changed
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0);
if (rebInput.pTopic) {
......@@ -673,177 +668,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
sdbRelease(pSdb, pSub);
}
#if 0
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
char *msgStr = pMsg->rpcMsg.pCont;
SCMSubscribeReq subscribe;
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
int64_t consumerId = subscribe.consumerId;
char *cgroup = subscribe.consumerGroup;
SArray *newSub = subscribe.topicNames;
int32_t newTopicNum = subscribe.topicNum;
taosArraySortString(newSub, taosArrayCompareString);
SArray *oldSub = NULL;
int32_t oldTopicNum = 0;
bool createConsumer = false;
// create consumer if not exist
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
// create consumer
pConsumer = mndCreateConsumer(consumerId, cgroup);
createConsumer = true;
} else {
pConsumer->epoch++;
oldSub = pConsumer->currentTopics;
}
pConsumer->currentTopics = newSub;
if (oldSub != NULL) {
oldTopicNum = taosArrayGetSize(oldSub);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) {
// TODO: free memory
return -1;
}
int32_t i = 0, j = 0;
while (i < newTopicNum || j < oldTopicNum) {
char *newTopicName = NULL;
char *oldTopicName = NULL;
if (i >= newTopicNum) {
// encode unset topic msg to all vnodes related to that topic
oldTopicName = taosArrayGetP(oldSub, j);
j++;
} else if (j >= oldTopicNum) {
newTopicName = taosArrayGetP(newSub, i);
i++;
} else {
newTopicName = taosArrayGetP(newSub, i);
oldTopicName = taosArrayGetP(oldSub, j);
int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName);
if (comp == 0) {
// do nothing
oldTopicName = newTopicName = NULL;
i++;
j++;
continue;
} else if (comp < 0) {
oldTopicName = NULL;
i++;
} else {
newTopicName = NULL;
j++;
}
}
if (oldTopicName != NULL) {
ASSERT(newTopicName == NULL);
// cancel subscribe of old topic
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
ASSERT(pSub);
int32_t csz = taosArrayGetSize(pSub->consumers);
for (int32_t ci = 0; ci < csz; ci++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
if (pSubConsumer->consumerId == consumerId) {
int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
for (int32_t vgi = 0; vgi < vgsz; vgi++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp, oldTopicName);
taosArrayPush(pSub->unassignedVg, pConsumerEp);
}
taosArrayRemove(pSub->consumers, ci);
break;
}
}
char *oldTopicNameDup = strdup(oldTopicName);
taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
/*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
} else if (newTopicName != NULL) {
ASSERT(oldTopicName == NULL);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
if (pTopic == NULL) {
mError("topic being subscribed not exist: %s", newTopicName);
continue;
}
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
bool createSub = false;
if (pSub == NULL) {
mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup,
newTopicName);
pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
createSub = true;
mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg);
}
SMqSubConsumer mqSubConsumer;
mqSubConsumer.consumerId = consumerId;
mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp));
taosArrayPush(pSub->consumers, &mqSubConsumer);
// if have un assigned vg, assign one to the consumer
if (taosArrayGetSize(pSub->unassignedVg) > 0) {
SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = consumerId;
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
if (pConsumerEp->oldConsumerId == -1) {
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, newTopicName,
pConsumerEp->consumerId);
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
} else {
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, newTopicName);
}
// to trigger rebalance at once, do not set status active
/*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
}
SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pRaw);
if (!createSub) mndReleaseSubscribe(pMnode, pSub);
mndReleaseTopic(pMnode, pTopic);
}
}
/*if (oldSub) taosArrayDestroyEx(oldSub, (void (*)(void *))taosMemoryFree);*/
// persist consumerObj
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
mndTransDrop(pTrans);
if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
#endif
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
......@@ -82,7 +82,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withTagSchema, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
......@@ -146,7 +145,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTagSchema, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
......
......@@ -159,7 +159,6 @@ typedef struct {
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
int8_t withTagSchema;
char* qmsg;
STqPushHandle pushHandle;
// SRWLatch lock;
......
......@@ -31,6 +31,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq->path = strdup(path);
pTq->pVnode = pVnode;
pTq->pWal = pWal;
#if 0
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
(FTqDelete)taosMemoryFree, 0);
......@@ -401,6 +402,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch);
}
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
if (pHeadWithCkSum == NULL) {
return -1;
}
walSetReaderCapacity(pExec->pWalReader, 2048);
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset;
rsp.blockData = taosArrayInit(0, sizeof(void*));
......@@ -414,6 +422,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
break;
}
taosThreadMutexLock(&pExec->pWalReader->mutex);
if (walFetchHead(pExec->pWalReader, fetchOffset, pHeadWithCkSum) < 0) {
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset);
taosThreadMutexUnlock(&pExec->pWalReader->mutex);
break;
}
if (pHeadWithCkSum->head.msgType != TDMT_VND_SUBMIT) {
walSkipFetchBody(pExec->pWalReader, pHeadWithCkSum);
} else {
walFetchBody(pExec->pWalReader, &pHeadWithCkSum);
}
SWalReadHead* pHead = &pHeadWithCkSum->head;
taosThreadMutexUnlock(&pExec->pWalReader->mutex);
#if 0
SWalReadHead* pHead;
if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
......@@ -443,14 +471,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0;
#endif
break;
}
break;
}
#endif
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
// table subscribe
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
qTaskInfo_t task = pExec->task[workerId];
ASSERT(task);
......@@ -484,6 +514,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayPush(rsp.blockData, &buf);
rsp.blockNum++;
}
// db subscribe
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pCont, 0);
......@@ -789,7 +820,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pExec->withTbName = req.withTbName;
pExec->withSchema = req.withSchema;
pExec->withTag = req.withTag;
pExec->withTagSchema = req.withTagSchema;
pExec->qmsg = req.qmsg;
req.qmsg = NULL;
......
......@@ -138,6 +138,91 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return 0;
}
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; }
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
int32_t code;
// TODO: valid ver
if (pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver);
if (code < 0) return -1;
}
if (!taosValidFile(pRead->pReadLogTFile)) {
return -1;
}
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) {
return -1;
}
code = walValidHeadCksum(pHead);
if (code != 0) {
wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
return 0;
}
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) {
int32_t code;
ASSERT(pRead->curVersion == pHead->head.version);
code = taosLSeekFile(pRead->pReadLogTFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1;
return -1;
}
pRead->curVersion++;
return 0;
}
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) {
SWalReadHead *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version;
if (pRead->capacity < pReadHead->bodyLen) {
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
*ppHead = ptr;
pRead->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pRead->pReadLogTFile, pReadHead->body, pReadHead->bodyLen)) {
return -1;
}
if (pReadHead->version != ver) {
wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version,
ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (walValidBodyCksum(*ppHead) != 0) {
wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
pRead->curVersion = ver + 1;
return 0;
}
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) {
taosThreadMutexLock(&pRead->mutex);
if (walReadWithHandle(pRead, ver) < 0) {
......@@ -172,12 +257,14 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
if (code != sizeof(SWalHead)) {
return -1;
}
code = walValidHeadCksum(pRead->pHead);
if (code != 0) {
wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (pRead->capacity < pRead->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen);
if (ptr == NULL) {
......
......@@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tchecksum.h"
......@@ -298,14 +296,14 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
pWal->writeHead.head.bodyLen = bodyLen;
pWal->writeHead.head.msgType = msgType;
// sync info
// sync info for sync module
pWal->writeHead.head.syncMeta = syncMeta;
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
// ftruncate
// TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
......@@ -313,7 +311,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
}
if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) {
// ftruncate
// TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册