diff --git a/example/src/tmq.c b/example/src/tmq.c index 2abf915fd855b933f424ba0e5d81c1d9746dc5cc..b2cbf856c01340c05d4db113933a241067234ab3 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -17,6 +17,7 @@ #include #include #include +#include #include "taos.h" static int running = 1; @@ -47,6 +48,7 @@ int32_t init_env() { return -1; } taos_free_result(pRes); + sleep(1); pRes = taos_query(pConn, "use abc1"); if (taos_errno(pRes) != 0) { @@ -58,6 +60,7 @@ int32_t init_env() { pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"); if (taos_errno(pRes) != 0) { + assert(0); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); return -1; } @@ -265,10 +268,11 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) { } int main(int argc, char* argv[]) { - int code; if (argc > 1) { printf("env init\n"); - code = init_env(); + if (init_env() < 0) { + return -1; + } create_topic(); } tmq_t* tmq = build_consumer(); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 757fa8e74b9823dedf24ae1773f0459568c87325..5a2dd502c1c909a1c1248981e71d940deb450210 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1314,7 +1314,6 @@ typedef struct { } SMqConsumerLostMsg; typedef struct { - int32_t topicNum; int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; SArray* topicNames; // SArray @@ -1322,22 +1321,27 @@ typedef struct { static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { int32_t tlen = 0; - tlen += taosEncodeFixedI32(buf, pReq->topicNum); tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->cgroup); - for (int32_t i = 0; i < pReq->topicNum; i++) { + int32_t topicNum = taosArrayGetSize(pReq->topicNames); + tlen += taosEncodeFixedI32(buf, topicNum); + + for (int32_t i = 0; i < topicNum; i++) { tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i)); } return tlen; } static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { - buf = taosDecodeFixedI32(buf, &pReq->topicNum); buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->cgroup); - pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*)); - for (int32_t i = 0; i < pReq->topicNum; i++) { + + int32_t topicNum; + buf = taosDecodeFixedI32(buf, &topicNum); + + pReq->topicNames = taosArrayInit(topicNum, sizeof(void*)); + for (int32_t i = 0; i < topicNum; i++) { char* name; buf = taosDecodeString(buf, &name); taosArrayPush(pReq->topicNames, &name); @@ -1969,7 +1973,6 @@ typedef struct { int8_t withTbName; int8_t withSchema; int8_t withTag; - int8_t withTagSchema; char* qmsg; } SMqRebVgReq; @@ -1984,7 +1987,6 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR tlen += taosEncodeFixedI8(buf, pReq->withTbName); tlen += taosEncodeFixedI8(buf, pReq->withSchema); tlen += taosEncodeFixedI8(buf, pReq->withTag); - tlen += taosEncodeFixedI8(buf, pReq->withTagSchema); if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { tlen += taosEncodeString(buf, pReq->qmsg); } @@ -2001,7 +2003,6 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) buf = taosDecodeFixedI8(buf, &pReq->withTbName); buf = taosDecodeFixedI8(buf, &pReq->withSchema); buf = taosDecodeFixedI8(buf, &pReq->withTag); - buf = taosDecodeFixedI8(buf, &pReq->withTagSchema); if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { buf = taosDecodeString(buf, &pReq->qmsg); } @@ -2590,7 +2591,6 @@ typedef struct { int8_t withTbName; int8_t withSchema; int8_t withTag; - int8_t withTagSchema; SArray* blockDataLen; // SArray SArray* blockData; // SArray SArray* blockTbName; // SArray @@ -2609,7 +2609,6 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp tlen += taosEncodeFixedI8(buf, pRsp->withTbName); tlen += taosEncodeFixedI8(buf, pRsp->withSchema); tlen += taosEncodeFixedI8(buf, pRsp->withTag); - tlen += taosEncodeFixedI8(buf, pRsp->withTagSchema); for (int32_t i = 0; i < pRsp->blockNum; i++) { int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i); @@ -2632,7 +2631,6 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withSchema); buf = taosDecodeFixedI8(buf, &pRsp->withTag); - buf = taosDecodeFixedI8(buf, &pRsp->withTagSchema); for (int32_t i = 0; i < pRsp->blockNum; i++) { int32_t bLen = 0; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index d41e797536a9c83ca2678e44eb7631b08814d67b..7b0d70b7695e5ad190ef491b2e9c16d32c61fbb0 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -192,7 +192,13 @@ int32_t walEndSnapshot(SWal *); SWalReadHandle *walOpenReadHandle(SWal *); void walCloseReadHandle(SWalReadHandle *); int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); -int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead); + +// only for tq usage +// int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead); +void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity); +int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead); +int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead); +int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead); // deprecated #if 0 diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index a33351373edb7476c490da11b3e1778b1c023cae..8096ce395a3aaf6bb22d9c355a63eebafabdf75a 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -13,13 +13,13 @@ * along with this program. If not, see . */ -#include "os.h" -#include "tdef.h" -#include "tname.h" +#include "catalog.h" #include "clientInt.h" #include "clientLog.h" -#include "catalog.h" +#include "os.h" #include "query.h" +#include "tdef.h" +#include "tname.h" int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); @@ -50,7 +50,13 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { SConnectRsp connectRsp = {0}; tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp); - assert(connectRsp.epSet.numOfEps > 0); + /*assert(connectRsp.epSet.numOfEps > 0);*/ + if (connectRsp.epSet.numOfEps == 0) { + taosMemoryFree(pMsg->pData); + setErrno(pRequest, TSDB_CODE_MND_APP_ERROR); + tsem_post(&pRequest->body.rspSem); + return code; + } if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); @@ -82,18 +88,20 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } -SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { +SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) { SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); pMsgSendInfo->requestObjRefId = pRequest->self; - pMsgSendInfo->requestId = pRequest->requestId; - pMsgSendInfo->param = pRequest; - pMsgSendInfo->msgType = pRequest->type; + pMsgSendInfo->requestId = pRequest->requestId; + pMsgSendInfo->param = pRequest; + pMsgSendInfo->msgType = pRequest->type; assert(pRequest != NULL); pMsgSendInfo->msgInfo = pRequest->body.requestMsg; - pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)]; + pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL) + ? genericRspCallback + : handleRequestRspFp[TMSG_INDEX(pRequest->type)]; return pMsgSendInfo; } @@ -114,7 +122,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (TSDB_CODE_MND_DB_NOT_EXIST == code) { SUseDbRsp usedbRsp = {0}; tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); - struct SCatalog *pCatalog = NULL; + struct SCatalog* pCatalog = NULL; if (usedbRsp.vgVersion >= 0) { int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 699fa7ecf54a74637f16c2aa43b9b0afba4ef04c..d576f28cb4d4bd004884ab1cf2f832f4245a8ba0 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -382,12 +382,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ); if (pTmq->pTscObj == NULL) return NULL; - /*pTmq->inWaiting = 0;*/ pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; - /*pTmq->waitingRequest = 0;*/ - /*pTmq->readyRequest = 0;*/ pTmq->epStatus = 0; pTmq->epSkipCnt = 0; // set conf @@ -509,7 +506,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); SCMSubscribeReq req; - req.topicNum = sz; req.consumerId = tmq->consumerId; strcpy(req.cgroup, tmq->groupId); req.topicNames = taosArrayInit(sz, sizeof(void*)); @@ -519,12 +515,16 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { char* topicName = taosArrayGetP(container, i); SName name = {0}; +#if 0 char* dbName = getDbOfConnection(tmq->pTscObj); if (dbName == NULL) { return TMQ_RESP_ERR__FAIL; } - tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName)); +#endif + tNameSetDbName(&name, tmq->pTscObj->acctId, topicName, strlen(topicName)); +#if 0 tNameFromString(&name, topicName, T_NAME_TABLE); +#endif char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); if (topicFname == NULL) { @@ -542,7 +542,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { topic.vgs = taosArrayInit(0, sizeof(SMqClientVg)); taosArrayPush(tmq->clientTopics, &topic); taosArrayPush(req.topicNames, &topicFname); +#if 0 taosMemoryFree(dbName); +#endif } int tlen = tSerializeSCMSubscribeReq(NULL, &req); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 90c5c03fa8f1e24321bbb542765885376226735c..efd4cbfcecc53176f658c66548523ae1b8f140b1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -126,7 +126,6 @@ int32_t tDecodeSQueryNodeAddr(SCoder *pDecoder, SQueryNodeAddr *pAddr) { return 0; } - int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) { int32_t tlen = 0; tlen += taosEncodeFixedI8(buf, pEp->inUse); @@ -2745,11 +2744,11 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1; if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1; if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->subscribeDbName) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; - if (0 == astLen && tEncodeCStr(&encoder, pReq->subscribeDbName) < 0) return -1; tEndEncode(&encoder); @@ -2771,6 +2770,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1; if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1; if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->subscribeDbName) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1; @@ -2785,7 +2785,6 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR if (pReq->ast == NULL) return -1; if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1; } else { - if (tDecodeCStrTo(&decoder, pReq->subscribeDbName) < 0) return -1; } tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index bf994e12a3b33cf561ad925d0aa8ff5c26962b06..cf1cd58540733eba2a6f3d970d9462414de9c3bd 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -450,7 +450,6 @@ typedef struct { int8_t withTbName; int8_t withSchema; int8_t withTag; - int8_t withTagSchema; SRWLatch lock; int32_t sqlLen; int32_t astLen; @@ -517,7 +516,6 @@ typedef struct { int8_t withTbName; int8_t withSchema; int8_t withTag; - int8_t withTagSchema; SHashObj* consumerHash; // consumerId -> SMqConsumerEpInSub // TODO put -1 into unassignVgs // SArray* unassignedVgs; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 78d54d273deaf8b4295b7f1ccf05224bb1e8e060..800a013c4f74ad6e22b7ee399d0ddf51775782e9 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -237,7 +237,6 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { pSubNew->withTbName = pSub->withTbName; pSubNew->withSchema = pSub->withSchema; pSubNew->withTag = pSub->withTag; - pSubNew->withTagSchema = pSub->withTagSchema; pSubNew->vgNum = pSub->vgNum; pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); @@ -270,7 +269,6 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { tlen += taosEncodeFixedI8(buf, pSub->withTbName); tlen += taosEncodeFixedI8(buf, pSub->withSchema); tlen += taosEncodeFixedI8(buf, pSub->withTag); - tlen += taosEncodeFixedI8(buf, pSub->withTagSchema); void *pIter = NULL; int32_t sz = taosHashGetSize(pSub->consumerHash); @@ -297,7 +295,6 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { buf = taosDecodeFixedI8(buf, &pSub->withTbName); buf = taosDecodeFixedI8(buf, &pSub->withSchema); buf = taosDecodeFixedI8(buf, &pSub->withTag); - buf = taosDecodeFixedI8(buf, &pSub->withTagSchema); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e37bd60e125611c596fc49a9ccf335b375dbe662..6a1994d7b86b28fdfed346338ce0254312554a7e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -35,11 +35,6 @@ #define MND_SUBSCRIBE_REBALANCE_CNT 3 -enum { - MQ_SUBSCRIBE_STATUS__ACTIVE = 1, - MQ_SUBSCRIBE_STATUS__DELETED, -}; - static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); @@ -89,7 +84,6 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, pSub->withTbName = pTopic->withTbName; pSub->withSchema = pTopic->withSchema; pSub->withTag = pTopic->withTag; - pSub->withTagSchema = pTopic->withTagSchema; ASSERT(taosHashGetSize(pSub->consumerHash) == 1); @@ -115,7 +109,6 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri req.withTbName = pSub->withTbName; req.withSchema = pSub->withSchema; req.withTag = pSub->withTag; - req.withTagSchema = pSub->withTagSchema; strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); @@ -514,9 +507,11 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { // TODO replace assert with error check ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0); + // if add more consumer to balanced subscribe, // possibly no vg is changed /*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/ + ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0); if (rebInput.pTopic) { @@ -673,177 +668,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { sdbRelease(pSdb, pSub); } -#if 0 -static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { - SMnode *pMnode = pMsg->pNode; - char *msgStr = pMsg->rpcMsg.pCont; - SCMSubscribeReq subscribe; - tDeserializeSCMSubscribeReq(msgStr, &subscribe); - int64_t consumerId = subscribe.consumerId; - char *cgroup = subscribe.consumerGroup; - - SArray *newSub = subscribe.topicNames; - int32_t newTopicNum = subscribe.topicNum; - - taosArraySortString(newSub, taosArrayCompareString); - - SArray *oldSub = NULL; - int32_t oldTopicNum = 0; - bool createConsumer = false; - // create consumer if not exist - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); - if (pConsumer == NULL) { - // create consumer - pConsumer = mndCreateConsumer(consumerId, cgroup); - createConsumer = true; - } else { - pConsumer->epoch++; - oldSub = pConsumer->currentTopics; - } - pConsumer->currentTopics = newSub; - - if (oldSub != NULL) { - oldTopicNum = taosArrayGetSize(oldSub); - } - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg); - if (pTrans == NULL) { - // TODO: free memory - return -1; - } - - int32_t i = 0, j = 0; - while (i < newTopicNum || j < oldTopicNum) { - char *newTopicName = NULL; - char *oldTopicName = NULL; - if (i >= newTopicNum) { - // encode unset topic msg to all vnodes related to that topic - oldTopicName = taosArrayGetP(oldSub, j); - j++; - } else if (j >= oldTopicNum) { - newTopicName = taosArrayGetP(newSub, i); - i++; - } else { - newTopicName = taosArrayGetP(newSub, i); - oldTopicName = taosArrayGetP(oldSub, j); - - int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName); - if (comp == 0) { - // do nothing - oldTopicName = newTopicName = NULL; - i++; - j++; - continue; - } else if (comp < 0) { - oldTopicName = NULL; - i++; - } else { - newTopicName = NULL; - j++; - } - } - - if (oldTopicName != NULL) { - ASSERT(newTopicName == NULL); - - // cancel subscribe of old topic - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName); - ASSERT(pSub); - int32_t csz = taosArrayGetSize(pSub->consumers); - for (int32_t ci = 0; ci < csz; ci++) { - SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci); - if (pSubConsumer->consumerId == consumerId) { - int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); - for (int32_t vgi = 0; vgi < vgsz; vgi++) { - SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi); - mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp, oldTopicName); - taosArrayPush(pSub->unassignedVg, pConsumerEp); - } - taosArrayRemove(pSub->consumers, ci); - break; - } - } - char *oldTopicNameDup = strdup(oldTopicName); - taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup); - atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY); - /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/ - } else if (newTopicName != NULL) { - ASSERT(oldTopicName == NULL); - - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); - if (pTopic == NULL) { - mError("topic being subscribed not exist: %s", newTopicName); - continue; - } - - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName); - bool createSub = false; - if (pSub == NULL) { - mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup, - newTopicName); - pSub = mndCreateSubscription(pMnode, pTopic, cgroup); - createSub = true; - - mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg); - } - - SMqSubConsumer mqSubConsumer; - mqSubConsumer.consumerId = consumerId; - mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp)); - taosArrayPush(pSub->consumers, &mqSubConsumer); - - // if have un assigned vg, assign one to the consumer - if (taosArrayGetSize(pSub->unassignedVg) > 0) { - SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg); - pConsumerEp->oldConsumerId = pConsumerEp->consumerId; - pConsumerEp->consumerId = consumerId; - taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp); - if (pConsumerEp->oldConsumerId == -1) { - mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, newTopicName, - pConsumerEp->consumerId); - mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); - } else { - mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, newTopicName); - } - // to trigger rebalance at once, do not set status active - /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/ - } - - SSdbRaw *pRaw = mndSubActionEncode(pSub); - sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mndTransAppendRedolog(pTrans, pRaw); - - if (!createSub) mndReleaseSubscribe(pMnode, pSub); - mndReleaseTopic(pMnode, pTopic); - } - } - - /*if (oldSub) taosArrayDestroyEx(oldSub, (void (*)(void *))taosMemoryFree);*/ - - // persist consumerObj - SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer); - sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); - mndTransAppendRedolog(pTrans, pConsumerRaw); - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer); - return -1; - } - - mndTransDrop(pTrans); - if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer); - return TSDB_CODE_MND_ACTION_IN_PROGRESS; -} -#endif - static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } - -static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { - SSdb *pSdb = pMnode->pSdb; - sdbCancelFetch(pSdb, pIter); -} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index a4b98ba01a81cf67cbcf0a17e1ef29bef589675a..faddc3a1c533e4db9c80f54ce6e4f9fcb1a75d7e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -82,7 +82,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER); - SDB_SET_INT8(pRaw, dataPos, pTopic->withTagSchema, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER); @@ -146,7 +145,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER); - SDB_GET_INT8(pRaw, dataPos, &pTopic->withTagSchema, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char)); @@ -234,6 +232,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) { sdbRelease(pSdb, pTopic); } +#if 0 static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { SName name = {0}; tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -243,6 +242,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { return mndAcquireDb(pMnode, db); } +#endif static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) { int32_t contLen = sizeof(SDDropTopicReq); @@ -262,15 +262,11 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq } static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { - if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) { + if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subscribeDbName[0] == 0) { terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; return -1; } - if ((pCreate->ast == NULL || pCreate->ast[0] == 0) && pCreate->subscribeDbName[0] == 0) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - return -1; - } return 0; } @@ -386,7 +382,7 @@ static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq) { goto CREATE_TOPIC_OVER; } - pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name); + pDb = mndAcquireDb(pMnode, createTopicReq.subscribeDbName); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; goto CREATE_TOPIC_OVER; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 6e004a89fc6e3577b67a5364d8dbf3410f7e8c86..347d28f1ea2f04483f879c8e855445240fdab696 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -159,7 +159,6 @@ typedef struct { int8_t withTbName; int8_t withSchema; int8_t withTag; - int8_t withTagSchema; char* qmsg; STqPushHandle pushHandle; // SRWLatch lock; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0b3feb5010f70bb2066090f2a8884d599610eb85..434c5bc5bb997973f544c7d74eb29b35b6594873 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -31,6 +31,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { pTq->path = strdup(path); pTq->pVnode = pVnode; pTq->pWal = pWal; + #if 0 pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, (FTqDelete)taosMemoryFree, 0); @@ -401,6 +402,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch); } + SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048); + if (pHeadWithCkSum == NULL) { + return -1; + } + + walSetReaderCapacity(pExec->pWalReader, 2048); + SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pReq->currentOffset; rsp.blockData = taosArrayInit(0, sizeof(void*)); @@ -414,6 +422,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { break; } + taosThreadMutexLock(&pExec->pWalReader->mutex); + + if (walFetchHead(pExec->pWalReader, fetchOffset, pHeadWithCkSum) < 0) { + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, + TD_VID(pTq->pVnode), fetchOffset); + taosThreadMutexUnlock(&pExec->pWalReader->mutex); + break; + } + + if (pHeadWithCkSum->head.msgType != TDMT_VND_SUBMIT) { + walSkipFetchBody(pExec->pWalReader, pHeadWithCkSum); + } else { + walFetchBody(pExec->pWalReader, &pHeadWithCkSum); + } + + SWalReadHead* pHead = &pHeadWithCkSum->head; + + taosThreadMutexUnlock(&pExec->pWalReader->mutex); + +#if 0 SWalReadHead* pHead; if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) { // TODO: no more log, set timer to wait blocking time @@ -443,14 +471,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; #endif - break; - } + break; + } +#endif vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; + // table subscribe if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { qTaskInfo_t task = pExec->task[workerId]; ASSERT(task); @@ -484,6 +514,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayPush(rsp.blockData, &buf); rsp.blockNum++; } + // db subscribe } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { STqReadHandle* pReader = pExec->pExecReader[workerId]; tqReadHandleSetMsg(pReader, pCont, 0); @@ -789,7 +820,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pExec->withTbName = req.withTbName; pExec->withSchema = req.withSchema; pExec->withTag = req.withTag; - pExec->withTagSchema = req.withTagSchema; pExec->qmsg = req.qmsg; req.qmsg = NULL; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index a3f8c3aed5eed8de0732ac7a9a1d8dc827af5a6e..31cc629e0a31d194d042e9eb1fde01a8e14585ad 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -198,7 +198,7 @@ static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo* pInfo) { SParseContext* pParCxt = pCxt->pParseCxt; - SName name; + SName name; tNameSetDbName(&name, pCxt->pParseCxt->acctId, pDbName, strlen(pDbName)); char dbFname[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(&name, dbFname); @@ -560,8 +560,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes; } } else if (nodesIsComparisonOp(pOp)) { - if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type || - TSDB_DATA_TYPE_BLOB == rdt.type) { + if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type || TSDB_DATA_TYPE_BLOB == rdt.type) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); } if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) { @@ -569,7 +568,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { } pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; - } else if (nodesIsJsonOp(pOp)){ + } else if (nodesIsJsonOp(pOp)) { if (TSDB_DATA_TYPE_JSON != ldt.type || TSDB_DATA_TYPE_BINARY != rdt.type) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); } @@ -588,7 +587,9 @@ static EDealRes haveAggFunction(SNode* pNode, void* pContext) { } static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) { - SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet}; + SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog, + .pRpc = pCxt->pParseCxt->pTransporter, + .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet}; if (TSDB_CODE_SUCCESS != fmGetFuncInfo(¶m, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName); } @@ -1268,7 +1269,7 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* static EDealRes checkStateExpr(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { STranslateContext* pCxt = pContext; - SColumnNode* pCol = (SColumnNode*)pNode; + SColumnNode* pCol = (SColumnNode*)pNode; if (!IS_INTEGER_TYPE(pCol->node.resType.type)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE); } @@ -1345,7 +1346,8 @@ static int32_t translateFrom(STranslateContext* pCxt, SSelectStmt* pSelect) { } static int32_t checkLimit(STranslateContext* pCxt, SSelectStmt* pSelect) { - if ((NULL != pSelect->pLimit && pSelect->pLimit->offset < 0) || (NULL != pSelect->pSlimit && pSelect->pSlimit->offset < 0)) { + if ((NULL != pSelect->pLimit && pSelect->pLimit->offset < 0) || + (NULL != pSelect->pSlimit && pSelect->pSlimit->offset < 0)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_OFFSET_LESS_ZERO); } @@ -1438,7 +1440,7 @@ static int32_t translateSetOperatorImpl(STranslateContext* pCxt, SSetOperator* p SExprNode* pLeftExpr = (SExprNode*)pLeft; SExprNode* pRightExpr = (SExprNode*)pRight; if (!dataTypeEqual(&pLeftExpr->resType, &pRightExpr->resType)) { - SNode* pRightFunc = NULL; + SNode* pRightFunc = NULL; int32_t code = createCastFunc(pCxt, pRight, pLeftExpr->resType, &pRightFunc); if (TSDB_CODE_SUCCESS != code) { return code; @@ -1650,7 +1652,8 @@ static int32_t checkKeepOption(STranslateContext* pCxt, SNodeList* pKeep) { (TIME_UNIT_MINUTE != pKeep1->unit && TIME_UNIT_HOUR != pKeep1->unit && TIME_UNIT_DAY != pKeep1->unit)) || (pKeep2->isDuration && (TIME_UNIT_MINUTE != pKeep2->unit && TIME_UNIT_HOUR != pKeep2->unit && TIME_UNIT_DAY != pKeep2->unit))) { - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_KEEP_UNIT, pKeep0->unit, pKeep1->unit, pKeep2->unit); + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_KEEP_UNIT, pKeep0->unit, pKeep1->unit, + pKeep2->unit); } int32_t daysToKeep0 = getBigintFromValueNode(pKeep0); @@ -1659,7 +1662,7 @@ static int32_t checkKeepOption(STranslateContext* pCxt, SNodeList* pKeep) { if (daysToKeep0 < TSDB_MIN_KEEP || daysToKeep1 < TSDB_MIN_KEEP || daysToKeep2 < TSDB_MIN_KEEP || daysToKeep0 > TSDB_MAX_KEEP || daysToKeep1 > TSDB_MAX_KEEP || daysToKeep2 > TSDB_MAX_KEEP) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_KEEP_VALUE, daysToKeep0, daysToKeep1, daysToKeep2, - TSDB_MIN_KEEP, TSDB_MAX_KEEP); + TSDB_MIN_KEEP, TSDB_MAX_KEEP); } if (!((daysToKeep0 <= daysToKeep1) && (daysToKeep1 <= daysToKeep2))) { @@ -1691,7 +1694,8 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete return TSDB_CODE_SUCCESS; } -static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions, bool alter) { +static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions, + bool alter) { if (NULL == pOptions->pDaysPerFile && NULL == pOptions->pKeep) { return TSDB_CODE_SUCCESS; } @@ -1699,7 +1703,7 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa int64_t daysToKeep0 = GET_OPTION_VAL(nodesListGetNode(pOptions->pKeep, 0), alter ? -1 : TSDB_DEFAULT_KEEP); if (alter && (-1 == daysPerFile || -1 == daysToKeep0)) { SDbCfgInfo dbCfg; - int32_t code = getDBCfg(pCxt, pDbName, &dbCfg); + int32_t code = getDBCfg(pCxt, pDbName, &dbCfg); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -1712,7 +1716,8 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa return TSDB_CODE_SUCCESS; } -static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions, bool alter) { +static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions, + bool alter) { int32_t code = checkRangeOption(pCxt, "totalBlocks", pOptions->pNumOfBlocks, TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS); if (TSDB_CODE_SUCCESS == code) { @@ -1918,7 +1923,7 @@ static int32_t checTableFactorOption(STranslateContext* pCxt, SValueNode* pVal) } if (pVal->datum.d < TSDB_MIN_DB_FILE_FACTOR || pVal->datum.d > TSDB_MAX_DB_FILE_FACTOR) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_F_RANGE_OPTION, "file_factor", pVal->datum.d, - TSDB_MIN_DB_FILE_FACTOR, TSDB_MAX_DB_FILE_FACTOR); + TSDB_MIN_DB_FILE_FACTOR, TSDB_MAX_DB_FILE_FACTOR); } } return TSDB_CODE_SUCCESS; @@ -1945,7 +1950,7 @@ static int32_t checkTableTags(STranslateContext* pCxt, SCreateTableStmt* pStmt) SNode* pNode; FOREACH(pNode, pStmt->pTags) { SColumnDefNode* pCol = (SColumnDefNode*)pNode; - if(pCol->dataType.type == TSDB_DATA_TYPE_JSON && LIST_LENGTH(pStmt->pTags) > 1){ + if (pCol->dataType.type == TSDB_DATA_TYPE_JSON && LIST_LENGTH(pStmt->pTags) > 1) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG); } } @@ -1960,8 +1965,10 @@ static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs if (1 != LIST_LENGTH(pFuncs)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION); } - SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0); - SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet}; + SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0); + SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog, + .pRpc = pCxt->pParseCxt->pTransporter, + .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet}; if (TSDB_CODE_SUCCESS != fmGetFuncInfo(¶m, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName); } @@ -2016,10 +2023,10 @@ static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchem typedef struct SSampleAstInfo { const char* pDbName; const char* pTableName; - SNodeList* pFuncs; - SNode* pInterval; - SNode* pOffset; - SNode* pSliding; + SNodeList* pFuncs; + SNode* pInterval; + SNode* pOffset; + SNode* pSliding; STableMeta* pRollupTableMeta; } SSampleAstInfo; @@ -2089,8 +2096,8 @@ static SNode* makeIntervalVal(SRetention* pRetension, int8_t precision) { return NULL; } int64_t timeVal = convertTimeFromPrecisionToUnit(pRetension->freq, precision, pRetension->freqUnit); - char buf[20] = {0}; - int32_t len = snprintf(buf, sizeof(buf), "%"PRId64"%c", timeVal, pRetension->freqUnit); + char buf[20] = {0}; + int32_t len = snprintf(buf, sizeof(buf), "%" PRId64 "%c", timeVal, pRetension->freqUnit); pVal->literal = strndup(buf, len); if (NULL == pVal->literal) { nodesDestroyNode(pVal); @@ -2133,7 +2140,7 @@ static SNodeList* createRollupFuncs(SCreateTableStmt* pStmt) { SNode* pFunc = NULL; FOREACH(pFunc, pStmt->pOptions->pFuncs) { SNode* pCol = NULL; - bool primaryKey = true; + bool primaryKey = true; FOREACH(pCol, pStmt->pCols) { if (primaryKey) { primaryKey = false; @@ -2150,7 +2157,7 @@ static SNodeList* createRollupFuncs(SCreateTableStmt* pStmt) { } static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precision) { - int32_t numOfField = LIST_LENGTH(pStmt->pCols) + LIST_LENGTH(pStmt->pTags); + int32_t numOfField = LIST_LENGTH(pStmt->pCols) + LIST_LENGTH(pStmt->pTags); STableMeta* pMeta = taosMemoryCalloc(1, sizeof(STableMeta) + numOfField * sizeof(SSchema)); if (NULL == pMeta) { return NULL; @@ -2161,7 +2168,7 @@ static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precisi pMeta->tableInfo.numOfColumns = LIST_LENGTH(pStmt->pCols); int32_t index = 0; - SNode* pCol = NULL; + SNode* pCol = NULL; FOREACH(pCol, pStmt->pCols) { toSchema((SColumnDefNode*)pCol, index + 1, pMeta->schema + index); ++index; @@ -2175,8 +2182,8 @@ static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precisi return pMeta; } -static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt, - SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, SSampleAstInfo* pInfo) { +static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, SRetention* pRetension, + int8_t precision, SSampleAstInfo* pInfo) { pInfo->pDbName = pStmt->dbName; pInfo->pTableName = pStmt->tableName; pInfo->pFuncs = createRollupFuncs(pStmt); @@ -2188,10 +2195,10 @@ static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt, return TSDB_CODE_SUCCESS; } -static int32_t getRollupAst(STranslateContext* pCxt, - SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, char** pAst, int32_t* pLen) { +static int32_t getRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, + char** pAst, int32_t* pLen) { SSampleAstInfo info = {0}; - int32_t code = buildSampleAstInfoByTable(pCxt, pStmt, pRetension, precision, &info); + int32_t code = buildSampleAstInfoByTable(pCxt, pStmt, pRetension, precision, &info); if (TSDB_CODE_SUCCESS == code) { code = buildSampleAst(pCxt, &info, pAst, pLen); } @@ -2201,17 +2208,17 @@ static int32_t getRollupAst(STranslateContext* pCxt, static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, SMCreateStbReq* pReq) { SDbCfgInfo dbCfg = {0}; - int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg); - int32_t num = taosArrayGetSize(dbCfg.pRetensions); + int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg); + int32_t num = taosArrayGetSize(dbCfg.pRetensions); if (TSDB_CODE_SUCCESS != code || num < 2) { return code; } for (int32_t i = 1; i < num; ++i) { - SRetention *pRetension = taosArrayGet(dbCfg.pRetensions, i); + SRetention* pRetension = taosArrayGet(dbCfg.pRetensions, i); STranslateContext cxt = {0}; initTranslateContext(pCxt->pParseCxt, &cxt); - code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision, - 1 == i ? &pReq->pAst1 : &pReq->pAst2, 1 == i ? &pReq->ast1Len : &pReq->ast2Len); + code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision, 1 == i ? &pReq->pAst1 : &pReq->pAst2, + 1 == i ? &pReq->ast1Len : &pReq->ast2Len); destroyTranslateContext(&cxt); if (TSDB_CODE_SUCCESS != code) { break; @@ -2245,7 +2252,7 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) { SMCreateStbReq createReq = {0}; - int32_t code = checkCreateTable(pCxt, pStmt); + int32_t code = checkCreateTable(pCxt, pStmt); if (TSDB_CODE_SUCCESS == code) { code = buildCreateStbReq(pCxt, pStmt, &createReq); } @@ -2289,7 +2296,8 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableStmt* pStmt) { SName tableName; - return doTranslateDropSuperTable(pCxt, toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pStmt->ignoreNotExists); + return doTranslateDropSuperTable(pCxt, toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), + pStmt->ignoreNotExists); } static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAltertbReq* pAlterReq) { @@ -2421,7 +2429,7 @@ static int32_t nodeTypeToShowType(ENodeType nt) { case QUERY_NODE_SHOW_QUERIES_STMT: return TSDB_MGMT_TABLE_QUERIES; case QUERY_NODE_SHOW_VARIABLE_STMT: - return 0; // todo + return 0; // todo default: break; } @@ -2463,8 +2471,8 @@ static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexSt pInfo->pOffset = nodesCloneNode(pStmt->pOptions->pOffset); pInfo->pSliding = nodesCloneNode(pStmt->pOptions->pSliding); if (NULL == pInfo->pFuncs || NULL == pInfo->pInterval || - (NULL != pStmt->pOptions->pOffset && NULL == pInfo->pOffset) || - (NULL != pStmt->pOptions->pSliding && NULL == pInfo->pSliding)) { + (NULL != pStmt->pOptions->pOffset && NULL == pInfo->pOffset) || + (NULL != pStmt->pOptions->pSliding && NULL == pInfo->pSliding)) { return TSDB_CODE_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; @@ -2472,7 +2480,7 @@ static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexSt static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen) { SSampleAstInfo info = {0}; - int32_t code = buildSampleAstInfoByIndex(pCxt, pStmt, &info); + int32_t code = buildSampleAstInfoByIndex(pCxt, pStmt, &info); if (TSDB_CODE_SUCCESS == code) { code = buildSampleAst(pCxt, &info, pAst, pLen); } @@ -2618,9 +2626,9 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) { SName name; - // tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName)); - // tNameGetFullDbName(&name, pReq->name); - tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name); + tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName)); + tNameGetFullDbName(&name, pReq->name); + /*tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name);*/ pReq->igExists = pStmt->ignoreExists; pReq->withTbName = pStmt->pOptions->withTable; pReq->withSchema = pStmt->pOptions->withSchema; @@ -2633,16 +2641,19 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS int32_t code = TSDB_CODE_SUCCESS; + const char* dbName; if (NULL != pStmt->pQuery) { - strcpy(pReq->subscribeDbName, ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName); + dbName = ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName; pCxt->pParseCxt->topicQuery = true; code = translateQuery(pCxt, pStmt->pQuery); if (TSDB_CODE_SUCCESS == code) { code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL); } } else { - strcpy(pReq->subscribeDbName, pStmt->subscribeDbName); + dbName = pStmt->subscribeDbName; } + tNameSetDbName(&name, pCxt->pParseCxt->acctId, dbName, strlen(dbName)); + tNameGetFullDbName(&name, pReq->subscribeDbName); return code; } @@ -2654,8 +2665,9 @@ static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) { SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; - if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) && NULL == pSelect->pGroupByList && - NULL == pSelect->pLimit && NULL == pSelect->pSlimit && NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) { + if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) && + NULL == pSelect->pGroupByList && NULL == pSelect->pLimit && NULL == pSelect->pSlimit && + NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) { return TSDB_CODE_SUCCESS; } } @@ -2665,7 +2677,7 @@ static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) { SCMCreateTopicReq createReq = {0}; - int32_t code = checkCreateTopic(pCxt, pStmt); + int32_t code = checkCreateTopic(pCxt, pStmt); if (TSDB_CODE_SUCCESS == code) { code = buildCreateTopicReq(pCxt, pStmt, &createReq); } @@ -2763,7 +2775,7 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt return TSDB_CODE_SUCCESS; } -static int32_t readFromFile(char* pName, int32_t *len, char **buf) { +static int32_t readFromFile(char* pName, int32_t* len, char** buf) { int64_t filesize = 0; if (taosStatFile(pName, &filesize, NULL) < 0) { return TAOS_SYSTEM_ERROR(errno); @@ -3408,8 +3420,8 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema, SKVRowBuilder* pBuilder) { - if(pSchema->type == TSDB_DATA_TYPE_JSON){ - if(pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){ + if (pSchema->type == TSDB_DATA_TYPE_JSON) { + if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal); } @@ -3420,10 +3432,11 @@ static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SS return pCxt->errCode; } - if(pVal->node.resType.type == TSDB_DATA_TYPE_NULL){ + if (pVal->node.resType.type == TSDB_DATA_TYPE_NULL) { // todo - }else{ - tdAddColToKVRow(pBuilder, pSchema->colId, &(pVal->datum.p), IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]); + } else { + tdAddColToKVRow(pBuilder, pSchema->colId, &(pVal->datum.p), + IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]); } return TSDB_CODE_SUCCESS; @@ -3692,7 +3705,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) { STranslateContext cxt = {0}; - int32_t code = initTranslateContext(pParseCxt, &cxt); + int32_t code = initTranslateContext(pParseCxt, &cxt); if (TSDB_CODE_SUCCESS == code) { code = fmFuncMgtInit(); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index faf5182e26204e912cbe2df27435e62fdec25c33..64df4f02bf7e826589f8b2b3d61f9972328dacd4 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -46,7 +46,7 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { SQueryTableRsp rsp = {.code = code}; int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); - void * msg = rpcMallocCont(contLen); + void *msg = rpcMallocCont(contLen); tSerializeSQueryTableRsp(msg, contLen, &rsp); SRpcMsg rpcRsp = { @@ -87,7 +87,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo}; int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); - void * pRsp = rpcMallocCont(contLen); + void *pRsp = rpcMallocCont(contLen); tSerializeSExplainRsp(pRsp, contLen, &rsp); SRpcMsg rpcRsp = { @@ -107,7 +107,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); - void * pRsp = rpcMallocCont(contLen); + void *pRsp = rpcMallocCont(contLen); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); SRpcMsg rpcRsp = { @@ -223,7 +223,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { showRsp.tableMeta.numOfColumns = cols; int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp); - void * pBuf = rpcMallocCont(bufLen); + void *pBuf = rpcMallocCont(bufLen); tSerializeSShowRsp(pBuf, bufLen, &showRsp); SRpcMsg rpcMsg = { @@ -403,8 +403,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { bool queryDone = false; SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont; bool needStop = false; - SQWTaskCtx * handles = NULL; - SQWorkerMgmt * mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWTaskCtx *handles = NULL; + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -538,7 +538,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorkerMgmt * mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -620,7 +620,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; SSchedulerHbReq req = {0}; - SQWorkerMgmt * mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; if (NULL == pMsg->pCont) { QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 6d598699c5eab80e64fa7582fc7fac3de3bd4d25..70a3559cd9f92d6b48d5c6aee68cd72eb05b4424 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -138,6 +138,91 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { return 0; } +void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; } + +int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { + int32_t code; + // TODO: valid ver + + if (pRead->curVersion != ver) { + code = walReadSeekVer(pRead, ver); + if (code < 0) return -1; + } + + if (!taosValidFile(pRead->pReadLogTFile)) { + return -1; + } + + code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead)); + if (code != sizeof(SWalHead)) { + return -1; + } + + code = walValidHeadCksum(pHead); + + if (code != 0) { + wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } + + return 0; +} + +int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) { + int32_t code; + + ASSERT(pRead->curVersion == pHead->head.version); + + code = taosLSeekFile(pRead->pReadLogTFile, pHead->head.bodyLen, SEEK_CUR); + if (code < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + pRead->curVersion = -1; + return -1; + } + + pRead->curVersion++; + + return 0; +} + +int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) { + SWalReadHead *pReadHead = &((*ppHead)->head); + int64_t ver = pReadHead->version; + + if (pRead->capacity < pReadHead->bodyLen) { + void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + pReadHead->bodyLen); + if (ptr == NULL) { + terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; + return -1; + } + *ppHead = ptr; + pRead->capacity = pReadHead->bodyLen; + } + + if (pReadHead->bodyLen != taosReadFile(pRead->pReadLogTFile, pReadHead->body, pReadHead->bodyLen)) { + return -1; + } + + if (pReadHead->version != ver) { + wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, + ver); + pRead->curVersion = -1; + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } + + if (walValidBodyCksum(*ppHead) != 0) { + wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver); + pRead->curVersion = -1; + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } + + pRead->curVersion = ver + 1; + return 0; +} + int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) { taosThreadMutexLock(&pRead->mutex); if (walReadWithHandle(pRead, ver) < 0) { @@ -172,12 +257,14 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { if (code != sizeof(SWalHead)) { return -1; } + code = walValidHeadCksum(pRead->pHead); if (code != 0) { wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } + if (pRead->capacity < pRead->pHead->head.bodyLen) { void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen); if (ptr == NULL) { diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 530930c2610c9f57865c7493f4a4bc4153221ecd..207b7d84219750cf1d2110d30b4b6b6f9d69e842 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -13,8 +13,6 @@ * along with this program. If not, see . */ -#define _DEFAULT_SOURCE - #include "os.h" #include "taoserror.h" #include "tchecksum.h" @@ -298,14 +296,14 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.msgType = msgType; - // sync info + // sync info for sync module pWal->writeHead.head.syncMeta = syncMeta; pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) { - // ftruncate + // TODO ftruncate terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); @@ -313,7 +311,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog } if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) { - // ftruncate + // TODO ftruncate terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));