未验证 提交 1ac30b77 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20169 from taosdata/feature/3_liaohj

refactor: do some internal refactor and add some logs for tmq.
...@@ -29,11 +29,10 @@ extern "C" { ...@@ -29,11 +29,10 @@ extern "C" {
#define calloc CALLOC_FUNC_TAOS_FORBID #define calloc CALLOC_FUNC_TAOS_FORBID
#define realloc REALLOC_FUNC_TAOS_FORBID #define realloc REALLOC_FUNC_TAOS_FORBID
#define free FREE_FUNC_TAOS_FORBID #define free FREE_FUNC_TAOS_FORBID
#ifdef strdup #ifdef strdup
#undef strdup #undef strdup
#define strdup STRDUP_FUNC_TAOS_FORBID #define strdup STRDUP_FUNC_TAOS_FORBID
#endif #endif
#endif // ifndef ALLOW_FORBID_FUNC #endif // ifndef ALLOW_FORBID_FUNC
#endif // if !defined(WINDOWS) #endif // if !defined(WINDOWS)
......
...@@ -15,17 +15,11 @@ ...@@ -15,17 +15,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndConsumer.h" #include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndPrivilege.h" #include "mndPrivilege.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h"
#include "mndSubscribe.h" #include "mndSubscribe.h"
#include "mndTopic.h" #include "mndTopic.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h" #include "tcompare.h"
#include "tname.h" #include "tname.h"
...@@ -209,6 +203,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { ...@@ -209,6 +203,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return 0;
FAIL: FAIL:
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
...@@ -580,6 +575,10 @@ static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const ch ...@@ -580,6 +575,10 @@ static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const ch
return 0; return 0;
} }
static void* topicNameDup(void* p){
return taosStrdup((char*) p);
}
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
char *msgStr = pMsg->pCont; char *msgStr = pMsg->pCont;
...@@ -616,16 +615,17 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -616,16 +615,17 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
// set the update type
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
// all subscribed topics should re-balance.
taosArrayDestroy(pConsumerNew->rebNewTopics); taosArrayDestroy(pConsumerNew->rebNewTopics);
pConsumerNew->rebNewTopics = pTopicList; // all subscribe topics should re-balance. pConsumerNew->rebNewTopics = pTopicList;
subscribe.topicNames = NULL; subscribe.topicNames = NULL;
for (int32_t i = 0; i < newTopicNum; i++) {
char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i));
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
}
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
...@@ -646,17 +646,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -646,17 +646,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
goto _over; goto _over;
} }
// set the update type
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
for (int32_t i = 0; i < newTopicNum; i++) { int32_t oldTopicNum = (pExistedConsumer->currentTopics)? taosArrayGetSize(pExistedConsumer->currentTopics):0;
char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i));
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
}
int32_t oldTopicNum = 0;
if (pExistedConsumer->currentTopics) {
oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
}
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
while (i < oldTopicNum || j < newTopicNum) { while (i < oldTopicNum || j < newTopicNum) {
...@@ -692,11 +687,8 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -692,11 +687,8 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
} }
} }
if (pExistedConsumer && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && // no topics need to be rebalanced
taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
/*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
/*pConsumerNew->updateType = */
/*}*/
goto _over; goto _over;
} }
...@@ -718,8 +710,9 @@ _over: ...@@ -718,8 +710,9 @@ _over:
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
} }
// TODO: replace with destroy subscribe msg // TODO: replace with destroy subscribe msg
if (subscribe.topicNames) taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
return code; return code;
} }
...@@ -750,12 +743,12 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { ...@@ -750,12 +743,12 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
CM_ENCODE_OVER: CM_ENCODE_OVER:
taosMemoryFreeClear(buf); taosMemoryFreeClear(buf);
if (terrno != 0) { if (terrno != 0) {
mError("consumer:%" PRId64 ", failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
return NULL; return NULL;
} }
mTrace("consumer:%" PRId64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer); mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
return pRaw; return pRaw;
} }
...@@ -823,8 +816,8 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) { ...@@ -823,8 +816,8 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
} }
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mDebug("consumer:0x%" PRIx64 " perform delete action, status:%s", pConsumer->consumerId, mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId,
mndConsumerStatusName(pConsumer->status)); pConsumer->status, mndConsumerStatusName(pConsumer->status));
tDeleteSMqConsumerObj(pConsumer); tDeleteSMqConsumerObj(pConsumer);
return 0; return 0;
} }
...@@ -1075,22 +1068,23 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * ...@@ -1075,22 +1068,23 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
// consumer group // consumer group
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(cgroup), pConsumer->cgroup, TSDB_CGROUP_LEN); STR_TO_VARSTR(cgroup, pConsumer->cgroup);
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false); colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
// client id // client id
char clientId[256 + VARSTR_HEADER_SIZE] = {0}; char clientId[256 + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(clientId), pConsumer->clientId, 256); STR_TO_VARSTR(clientId, pConsumer->clientId);
varDataSetLen(clientId, strlen(varDataVal(clientId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false); colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
// status // status
char status[20 + VARSTR_HEADER_SIZE] = {0}; char status[20 + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20); const char* pStatusName = mndConsumerStatusName(pConsumer->status);
varDataSetLen(status, strlen(varDataVal(status))); STR_TO_VARSTR(status, pStatusName);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)status, false); colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
...@@ -1123,8 +1117,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * ...@@ -1123,8 +1117,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
numOfRows++; numOfRows++;
} }
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
sdbRelease(pSdb, pConsumer); sdbRelease(pSdb, pConsumer);
pBlock->info.rows = numOfRows;
} }
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;
......
...@@ -16,15 +16,10 @@ ...@@ -16,15 +16,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndSubscribe.h" #include "mndSubscribe.h"
#include "mndConsumer.h" #include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndScheduler.h" #include "mndScheduler.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h" #include "mndTopic.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "tcompare.h" #include "tcompare.h"
#include "tname.h" #include "tname.h"
...@@ -1041,7 +1036,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock ...@@ -1041,7 +1036,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
} }
// do not show for cleared subscription // do not show for cleared subscription
#if 1
int32_t sz = taosArrayGetSize(pSub->unassignedVgs); int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
...@@ -1087,8 +1081,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock ...@@ -1087,8 +1081,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
numOfRows++; numOfRows++;
} }
#endif
pBlock->info.rows = numOfRows; pBlock->info.rows = numOfRows;
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
......
...@@ -217,7 +217,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { ...@@ -217,7 +217,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
char buf2[80] = {0}; char buf2[80] = {0};
tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset); tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset); tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
return 0; return 0;
...@@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ...@@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
char buf2[80] = {0}; char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, reqOffset:%s, rspOffset:%s", tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
return 0; return 0;
...@@ -604,7 +604,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -604,7 +604,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
code = -1; code = -1;
} }
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp data block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
......
...@@ -113,9 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs ...@@ -113,9 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
return -1; return -1;
} }
ASSERT(pRsp->withTbName == false); ASSERT(!(pRsp->withTbName || pRsp->withSchema));
ASSERT(pRsp->withSchema == false);
return 0; return 0;
} }
......
...@@ -197,7 +197,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { ...@@ -197,7 +197,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
return -1; return -1;
} }
tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 "epoch:%d vgId:%d", pHandle->subKey, tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
(int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode)); (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
void* buf = taosMemoryCalloc(1, vlen); void* buf = taosMemoryCalloc(1, vlen);
......
...@@ -193,7 +193,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ ...@@ -193,7 +193,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
taosWUnLockLatch(&pHandle->pushHandle.lock); taosWUnLockLatch(&pHandle->pushHandle.lock);
tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64, tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64,
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum, TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
rsp.reqOffset, rsp.rspOffset); rsp.reqOffset, rsp.rspOffset);
...@@ -210,25 +210,30 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -210,25 +210,30 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
int32_t len = msgLen - sizeof(SSubmitReq2Msg); int32_t len = msgLen - sizeof(SSubmitReq2Msg);
tqDebug("vgId:%d tq push msg version:%" PRId64 " type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver,
TMSG_INFO(msgType), msg, pReq, len);
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost // lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->pushLock);
if (taosHashGetSize(pTq->pPushMgr) != 0) {
tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
if (numOfRegisteredPush > 0) {
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
void* data = taosMemoryMalloc(len); void* data = taosMemoryMalloc(len);
if (data == NULL) { if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory"); tqError("failed to copy data for stream since out of memory");
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
taosArrayDestroy(cachedKeyLens); taosArrayDestroy(cachedKeyLens);
// unlock
taosWUnLockLatch(&pTq->pushLock);
return -1; return -1;
} }
memcpy(data, pReq, len); memcpy(data, pReq, len);
void* pIter = NULL; void* pIter = NULL;
...@@ -262,7 +267,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -262,7 +267,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
}; };
qStreamSetScanMemData(task, submit); qStreamSetScanMemData(task, submit);
// exec // here start to scan submit block to extract the subscribed data
while (1) { while (1) {
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
...@@ -278,7 +283,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -278,7 +283,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
pRsp->blockNum++; pRsp->blockNum++;
} }
tqDebug("vgId:%d, tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, pPushEntry->subKey, tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", pTq->pVnode->config.vgId, pPushEntry->subKey,
pRsp->blockNum); pRsp->blockNum);
if (pRsp->blockNum > 0) { if (pRsp->blockNum > 0) {
// set offset // set offset
...@@ -295,6 +300,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -295,6 +300,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tqPushDataRsp(pTq, pPushEntry); tqPushDataRsp(pTq, pPushEntry);
} }
} }
// delete entry // delete entry
for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) { for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) {
void* key = taosArrayGetP(cachedKeys, i); void* key = taosArrayGetP(cachedKeys, i);
......
...@@ -291,10 +291,15 @@ void tqCloseReader(STqReader* pReader) { ...@@ -291,10 +291,15 @@ void tqCloseReader(STqReader* pReader) {
} }
int32_t tqSeekVer(STqReader* pReader, int64_t ver) { int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
// todo set the correct vgId
tqDebug("tmq poll: vgId:%d wal seek to version:%"PRId64, 0, ver);
if (walReadSeekVer(pReader->pWalReader, ver) < 0) { if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
tqError("tmq poll: wal reader failed to seek to ver:%"PRId64, ver);
return -1; return -1;
} else {
tqDebug("tmq poll: wal reader seek to ver:%"PRId64, ver);
return 0;
} }
return 0;
} }
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
...@@ -302,28 +307,33 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -302,28 +307,33 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while (1) { while (1) {
if (!fromProcessedMsg) { if (!fromProcessedMsg) {
if (walNextValidMsg(pReader->pWalReader) < 0) { SWalReader* pWalReader = pReader->pWalReader;
pReader->ver =
pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped); if (walNextValidMsg(pWalReader) < 0) {
pReader->ver = pWalReader->curVersion - (pWalReader->curInvalid | pWalReader->curStopped);
ret->offset.type = TMQ_OFFSET__LOG; ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver; ret->offset.version = pReader->ver;
ret->fetchType = FETCH_TYPE__NONE; ret->fetchType = FETCH_TYPE__NONE;
tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version); tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version);
return -1; return -1;
} }
void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); void* body = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int64_t ver = pReader->pWalReader->pHead->head.version; int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pWalReader->pHead->head.version;
tqDebug("tmq poll: extract submit msg from wal, version:%"PRId64" len:%d", ver, bodyLen);
#if 0 #if 0
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { if (pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
// TODO do filter // TODO do filter
ret->fetchType = FETCH_TYPE__META; ret->fetchType = FETCH_TYPE__META;
ret->meta = pReader->pWalReader->pHead->head.body; ret->meta = pWalReader->pHead->head.body;
return 0; return 0;
} else { } else {
#endif #endif
tqReaderSetSubmitReq2(pReader, body, bodyLen, ver); tqReaderSetSubmitReq2(pReader, body, bodyLen, ver);
/*tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);*/ /*tqReaderSetDataMsg(pReader, body, pWalReader->pHead->head.version);*/
#if 0 #if 0
} }
#endif #endif
...@@ -358,7 +368,7 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v ...@@ -358,7 +368,7 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v
// if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; // if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
// while (true) { // while (true) {
// if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1; // if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pReader->pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen, // tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen,
// pReader->msgIter.len, pReader->msgIter.uid); // pReader->msgIter.len, pReader->msgIter.uid);
// if (pReader->pBlock == NULL) break; // if (pReader->pBlock == NULL) break;
// } // }
...@@ -371,10 +381,8 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v ...@@ -371,10 +381,8 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v
#endif #endif
int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
ASSERT(pReader->msg2.msgStr == NULL); ASSERT(pReader->msg2.msgStr == NULL && msgStr && msgLen && (ver >= 0));
ASSERT(msgStr);
ASSERT(msgLen);
ASSERT(ver >= 0);
pReader->msg2.msgStr = msgStr; pReader->msg2.msgStr = msgStr;
pReader->msg2.msgLen = msgLen; pReader->msg2.msgLen = msgLen;
pReader->msg2.ver = ver; pReader->msg2.ver = ver;
...@@ -421,7 +429,10 @@ bool tqNextDataBlock(STqReader* pReader) { ...@@ -421,7 +429,10 @@ bool tqNextDataBlock(STqReader* pReader) {
#endif #endif
bool tqNextDataBlock2(STqReader* pReader) { bool tqNextDataBlock2(STqReader* pReader) {
if (pReader->msg2.msgStr == NULL) return false; if (pReader->msg2.msgStr == NULL) {
return false;
}
ASSERT(pReader->setMsg == 1); ASSERT(pReader->setMsg == 1);
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen, tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen,
...@@ -528,7 +539,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) ...@@ -528,7 +539,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader)
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
"), version %d, possibly dropped table", "), version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->msgIter.suid, sversion); pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->msgIter.suid, sversion);
pReader->cachedSchemaSuid = 0; pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1; return -1;
...@@ -538,7 +549,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) ...@@ -538,7 +549,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader)
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1); pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) { if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->cachedSchemaVer); pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->cachedSchemaVer);
pReader->cachedSchemaSuid = 0; pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1; return -1;
......
...@@ -1273,8 +1273,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -1273,8 +1273,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
goto _exit; goto _exit;
} }
for (int32_t i = 1; i < nColData; i++) { for (int32_t j = 1; j < nColData; j++) {
if (aColData[i].nVal != aColData[0].nVal) { if (aColData[j].nVal != aColData[0].nVal) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit;
} }
...@@ -1308,8 +1308,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -1308,8 +1308,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1); SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);
// create table // create table
if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
0) { // create table success // create table success
if (newTbUids == NULL && if (newTbUids == NULL &&
(newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) { (newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) {
...@@ -1339,7 +1339,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -1339,7 +1339,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
pSubmitRsp->affectedRows += affectedRows; pSubmitRsp->affectedRows += affectedRows;
} }
// update table uid list // update the affected table uid list
if (taosArrayGetSize(newTbUids) > 0) { if (taosArrayGetSize(newTbUids) > 0) {
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
(int32_t)taosArrayGetSize(newTbUids)); (int32_t)taosArrayGetSize(newTbUids));
......
...@@ -133,8 +133,7 @@ typedef struct { ...@@ -133,8 +133,7 @@ typedef struct {
int64_t snapshotVer; int64_t snapshotVer;
// const SSubmitReq* pReq; // const SSubmitReq* pReq;
SPackedData submit; SPackedData submit;
SSchemaWrapper* schema; SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
int8_t recoverStep; int8_t recoverStep;
......
...@@ -850,32 +850,32 @@ static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTa ...@@ -850,32 +850,32 @@ static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTa
tagVal.cid = pColInfo->info.colId; tagVal.cid = pColInfo->info.colId;
if (p1->pTagVal == NULL) { if (p1->pTagVal == NULL) {
colDataSetNULL(pColInfo, i); colDataSetNULL(pColInfo, i);
} } else {
const char* p = metaGetTableTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
const char* p = metaGetTableTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) { colDataSetNULL(pColInfo, i);
colDataSetNULL(pColInfo, i); } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) { colDataSetVal(pColInfo, i, p, false);
colDataSetVal(pColInfo, i, p, false); } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1); varDataSetLen(tmp, tagVal.nData);
varDataSetLen(tmp, tagVal.nData); memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData); colDataSetVal(pColInfo, i, tmp, false);
colDataSetVal(pColInfo, i, tmp, false);
#if TAG_FILTER_DEBUG #if TAG_FILTER_DEBUG
qDebug("tagfilter varch:%s", tmp + 2); qDebug("tagfilter varch:%s", tmp + 2);
#endif #endif
taosMemoryFree(tmp); taosMemoryFree(tmp);
} else { } else {
colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false); colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
#if TAG_FILTER_DEBUG #if TAG_FILTER_DEBUG
if (pColInfo->info.type == TSDB_DATA_TYPE_INT) { if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
qDebug("tagfilter int:%d", *(int*)(&tagVal.i64)); qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
} else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) { } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
qDebug("tagfilter double:%f", *(double*)(&tagVal.i64)); qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
} }
#endif #endif
}
} }
} }
} }
......
...@@ -1035,8 +1035,9 @@ int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t sc ...@@ -1035,8 +1035,9 @@ int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t sc
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); ASSERT((pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE )&& (pTaskInfo->streamInfo.submit.msgStr == NULL));
ASSERT(pTaskInfo->streamInfo.submit.msgStr == NULL); qDebug("set the submit block for future scan");
pTaskInfo->streamInfo.submit = submit; pTaskInfo->streamInfo.submit = submit;
return 0; return 0;
} }
...@@ -1047,14 +1048,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1047,14 +1048,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
pTaskInfo->streamInfo.prepareStatus = *pOffset; pTaskInfo->streamInfo.prepareStatus = *pOffset;
pTaskInfo->streamInfo.returned = 0; pTaskInfo->streamInfo.returned = 0;
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
return 0; return 0;
} }
if (subType == TOPIC_SUB_TYPE__COLUMN) { if (subType == TOPIC_SUB_TYPE__COLUMN) {
uint16_t type = pOperator->operatorType;
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
// TODO add more check // TODO add more check
if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
ASSERT(pOperator->numOfDownstream == 1); ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0]; pOperator = pOperator->pDownstream[0];
} }
...@@ -1064,11 +1067,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1064,11 +1067,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader); tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL; pTSInfo->base.dataReader = NULL;
// let's seek to the next version in wal file
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
return -1; return -1;
} }
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
// those data are from the snapshot in tsdb, besides the data in the wal file.
int64_t uid = pOffset->uid; int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts; int64_t ts = pOffset->ts;
...@@ -1127,7 +1132,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1127,7 +1132,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
ts, pTableScanInfo->currentTable, numOfTables); ts, pTableScanInfo->currentTable, numOfTables);
/*}*/
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -1170,7 +1174,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1170,7 +1174,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema; pTaskInfo->streamInfo.schema = mtInfo.schema;
qDebug("tmqsnap qStreamPrepareScan snapshot data uid %" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
...@@ -1178,7 +1182,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1178,7 +1182,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
return -1; return -1;
} }
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts);
} else if (pOffset->type == TMQ_OFFSET__LOG) { } else if (pOffset->type == TMQ_OFFSET__LOG) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
tsdbReaderClose(pInfo->dataReader); tsdbReaderClose(pInfo->dataReader);
......
...@@ -1562,7 +1562,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1562,7 +1562,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
qDebug("queue scan called"); qDebug("start to exec queue scan");
if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg2.msgStr == NULL) { if (pInfo->tqReader->msg2.msgStr == NULL) {
...@@ -1587,7 +1587,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1587,7 +1587,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SSDataBlock block = {0}; SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue; continue;
} }
......
...@@ -2101,9 +2101,9 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) { ...@@ -2101,9 +2101,9 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
pVal->pz = taosMemoryMalloc(pVal->nLen + VARSTR_HEADER_SIZE + 1); pVal->pz = taosMemoryMalloc(pVal->nLen + 1);
memcpy(pVal->pz, pNode->datum.p, pVal->nLen + VARSTR_HEADER_SIZE); memcpy(pVal->pz, pNode->datum.p, pVal->nLen);
pVal->pz[pVal->nLen + VARSTR_HEADER_SIZE] = 0; pVal->pz[pVal->nLen] = 0;
break; break;
case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_JSON:
pVal->nLen = getJsonValueLen(pNode->datum.p); pVal->nLen = getJsonValueLen(pNode->datum.p);
......
...@@ -50,11 +50,6 @@ uint32_t taosDJB2Hash(const char *key, uint32_t len) { ...@@ -50,11 +50,6 @@ uint32_t taosDJB2Hash(const char *key, uint32_t len) {
return hash; return hash;
} }
uint32_t xxHash(const char *key, uint32_t len) {
int32_t seed = 0xcc9e2d51;
return XXH32(key, len, seed);
}
uint32_t MurmurHash3_32(const char *key, uint32_t len) { uint32_t MurmurHash3_32(const char *key, uint32_t len) {
const uint8_t *data = (const uint8_t *)key; const uint8_t *data = (const uint8_t *)key;
const int32_t nblocks = len >> 2u; const int32_t nblocks = len >> 2u;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册