提交 219d966b 编写于 作者: L Liu Jicong

fix(wal): read after rolling

上级 9189f886
...@@ -55,7 +55,7 @@ enum { ...@@ -55,7 +55,7 @@ enum {
enum { enum {
STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK, STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__TABLE_SCAN, // STREAM_INPUT__TABLE_SCAN,
STREAM_INPUT__TQ_SCAN, STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__TRIGGER, STREAM_INPUT__TRIGGER,
......
...@@ -124,6 +124,7 @@ typedef struct SWal { ...@@ -124,6 +124,7 @@ typedef struct SWal {
typedef struct { typedef struct {
int8_t scanUncommited; int8_t scanUncommited;
int8_t scanMeta; int8_t scanMeta;
int8_t enableRef;
} SWalFilterCond; } SWalFilterCond;
typedef struct { typedef struct {
......
...@@ -901,6 +901,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -901,6 +901,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t)); tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
if (pTmq == NULL) { if (pTmq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
return NULL; return NULL;
} }
...@@ -917,6 +919,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -917,6 +919,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->delayedTask = taosOpenQueue(); pTmq->delayedTask = taosOpenQueue();
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) { if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
goto FAIL; goto FAIL;
} }
...@@ -943,12 +947,14 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -943,12 +947,14 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// init semaphore // init semaphore
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) { if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
goto FAIL; goto FAIL;
} }
// init connection // init connection
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ); pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) { if (pTmq->pTscObj == NULL) {
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
tsem_destroy(&pTmq->rspSem); tsem_destroy(&pTmq->rspSem);
goto FAIL; goto FAIL;
} }
......
...@@ -55,7 +55,7 @@ int32_t tsNumOfMnodeQueryThreads = 2; ...@@ -55,7 +55,7 @@ int32_t tsNumOfMnodeQueryThreads = 2;
int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 2; int32_t tsNumOfVnodeQueryThreads = 2;
int32_t tsNumOfVnodeFetchThreads = 1; int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeWriteThreads = 2;
int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2;
int32_t tsNumOfVnodeMergeThreads = 2; int32_t tsNumOfVnodeMergeThreads = 2;
...@@ -190,7 +190,6 @@ int32_t tsMqRebalanceInterval = 2; ...@@ -190,7 +190,6 @@ int32_t tsMqRebalanceInterval = 2;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 60; int32_t tsTtlPushInterval = 60;
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN); tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
tsDiskCfg[index].level = level; tsDiskCfg[index].level = level;
...@@ -470,7 +469,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -470,7 +469,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400*365, 1) != 0) return -1; if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
...@@ -636,7 +635,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -636,7 +635,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
return 0; return 0;
} }
int32_t taosSetCfg(SConfig *pCfg, char* name) { int32_t taosSetCfg(SConfig *pCfg, char *name) {
int32_t len = strlen(name); int32_t len = strlen(name);
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len)); strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
...@@ -1006,7 +1005,6 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) { ...@@ -1006,7 +1005,6 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
return 0; return 0;
} }
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) { const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
if (tsCfg == NULL) osDefaultInit(); if (tsCfg == NULL) osDefaultInit();
......
...@@ -15,11 +15,11 @@ ...@@ -15,11 +15,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndConsumer.h" #include "mndConsumer.h"
#include "mndPrivilege.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndOffset.h" #include "mndOffset.h"
#include "mndPrivilege.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndSubscribe.h" #include "mndSubscribe.h"
...@@ -435,17 +435,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -435,17 +435,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
goto SUBSCRIBE_OVER; goto SUBSCRIBE_OVER;
} }
#if 0
// ref topic to prevent drop
// TODO make topic complete
SMqTopicObj topicObj = {0};
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
mInfo("subscribe topic %s by consumer:%" PRId64 ",cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup,
topicObj.refConsumerCnt);
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
#endif
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
} }
...@@ -472,8 +461,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -472,8 +461,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t status = atomic_load_32(&pConsumerOld->status); int32_t status = atomic_load_32(&pConsumerOld->status);
mInfo("receive subscribe request from old consumer:%" PRId64 ", current status: %s", consumerId, mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d",
mndConsumerStatusName(status)); consumerId, mndConsumerStatusName(status), newTopicNum);
if (status != MQ_CONSUMER_STATUS__READY) { if (status != MQ_CONSUMER_STATUS__READY) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
...@@ -849,12 +838,15 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * ...@@ -849,12 +838,15 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
mDebug("showing consumer %ld no assigned topic, skip", pConsumer->consumerId);
sdbRelease(pSdb, pConsumer); sdbRelease(pSdb, pConsumer);
continue; continue;
} }
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
mDebug("showing consumer %ld", pConsumer->consumerId);
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics); int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
bool hasTopic = true; bool hasTopic = true;
if (topicSz == 0) { if (topicSz == 0) {
......
...@@ -512,9 +512,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -512,9 +512,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.execDb.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
for (int32_t i = 0; i < 5; i++) {
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
}
pHandle->execHandle.execTb.suid = req.suid; pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
...@@ -524,6 +521,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -524,6 +521,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
} }
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList); tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
} }
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
......
...@@ -174,28 +174,9 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S ...@@ -174,28 +174,9 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
#endif #endif
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) { int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) {
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) { ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId];
ASSERT(task);
qSetStreamInput(task, pReq, STREAM_INPUT__DATA_SUBMIT, false);
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) break;
ASSERT(pDataBlock->info.rows != 0);
tqAddBlockDataToRsp(pDataBlock, pRsp); if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
}
pRsp->blockNum++;
}
} else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
pRsp->withSchema = 1; pRsp->withSchema = 1;
STqReader* pReader = pExec->pExecReader[workerId]; STqReader* pReader = pExec->pExecReader[workerId];
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
...@@ -232,9 +213,11 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR ...@@ -232,9 +213,11 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
pRsp->blockNum++; pRsp->blockNum++;
} }
} }
if (pRsp->blockNum == 0) { if (pRsp->blockNum == 0) {
pRsp->skipLogNum++; pRsp->skipLogNum++;
return -1; return -1;
} }
return 0; return 0;
} }
...@@ -60,9 +60,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -60,9 +60,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p); taosArrayPush(pInfo->pBlockLists, &p);
} }
} else if (type == STREAM_INPUT__TABLE_SCAN) { /*} else if (type == STREAM_INPUT__TABLE_SCAN) {*/
// do nothing /*ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN);*/
ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN);
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -71,6 +70,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -71,6 +70,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
} }
#if 0
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) { int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
...@@ -78,6 +78,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) { ...@@ -78,6 +78,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL); return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL);
} }
#endif
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
return qSetMultiStreamInput(tinfo, input, 1, type, assignUid); return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
......
...@@ -299,7 +299,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { ...@@ -299,7 +299,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
} }
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version); ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
pInfo->blockType = STREAM_INPUT__TABLE_SCAN; /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t uid = pOffset->uid; int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts; int64_t ts = pOffset->ts;
......
...@@ -1163,6 +1163,7 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32 ...@@ -1163,6 +1163,7 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
uidCol[i] = getGroupId(pOperator, uidCol[i]); uidCol[i] = getGroupId(pOperator, uidCol[i]);
} }
} }
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) { static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SOperatorInfo* pOperator = pInfo->pStreamScanOp; SOperatorInfo* pOperator = pInfo->pStreamScanOp;
...@@ -1256,13 +1257,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1256,13 +1257,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (setBlockIntoRes(pInfo, &ret.data) < 0) { if (setBlockIntoRes(pInfo, &ret.data) < 0) {
ASSERT(0); ASSERT(0);
} }
/*pTaskInfo->streamInfo.lastStatus = ret.offset;*/ // TODO clean data block
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
} else {
// data is filtered out, do clean
/*tDeleteSSDataBlock(&ret.data);*/
} }
} else if (ret.fetchType == FETCH_TYPE__META) { } else if (ret.fetchType == FETCH_TYPE__META) {
ASSERT(0); ASSERT(0);
...@@ -1280,6 +1277,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1280,6 +1277,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL; return pResult && pResult->info.rows > 0 ? pResult : NULL;
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
// TODO scan meta
ASSERT(0);
return NULL;
} }
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
...@@ -1437,6 +1438,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1437,6 +1438,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
} }
static SSDataBlock* doRawScan(SOperatorInfo* pInfo) {
//
return NULL;
}
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
...@@ -1449,6 +1455,19 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { ...@@ -1449,6 +1455,19 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
return tableIdList; return tableIdList;
} }
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup) {
// create operator
// create tb reader
// create meta reader
// create tq reader
return NULL;
}
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
uint64_t taskId) { uint64_t taskId) {
...@@ -1492,16 +1511,16 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -1492,16 +1511,16 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle) { if (pHandle) {
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info; STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
if (pHandle->version > 0) { if (pHandle->version > 0) {
pSTInfo->cond.endVersion = pHandle->version; pTSInfo->cond.endVersion = pHandle->version;
} }
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
if (pHandle->initTableReader) { if (pHandle->initTableReader) {
pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pSTInfo->dataReader = NULL; pTSInfo->dataReader = NULL;
if (tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, NULL) < 0) { if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) {
ASSERT(0); ASSERT(0);
} }
} }
...@@ -1515,14 +1534,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -1515,14 +1534,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->tqReader = pHandle->tqReader; pInfo->tqReader = pHandle->tqReader;
} }
if (pSTInfo->interval.interval > 0) { if (pTSInfo->interval.interval > 0) {
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark); pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pTwSup->waterMark);
} else { } else {
pInfo->pUpdateInfo = NULL; pInfo->pUpdateInfo = NULL;
} }
pInfo->pTableScanOp = pTableScanOp; pInfo->pTableScanOp = pTableScanOp;
pInfo->interval = pSTInfo->interval; pInfo->interval = pTSInfo->interval;
pInfo->readHandle = *pHandle; pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid; pInfo->tableUid = pScanPhyNode->uid;
......
...@@ -30,19 +30,24 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { ...@@ -30,19 +30,24 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
pRead->pWal = pWal; pRead->pWal = pWal;
pRead->pIdxFile = NULL; pRead->pIdxFile = NULL;
pRead->pLogFile = NULL; pRead->pLogFile = NULL;
pRead->curVersion = -5; pRead->curVersion = -1;
pRead->curFileFirstVer = -1; pRead->curFileFirstVer = -1;
pRead->curInvalid = 1; pRead->curInvalid = 1;
pRead->capacity = 0; pRead->capacity = 0;
if (cond) if (cond) {
pRead->cond = *cond; pRead->cond = *cond;
else { } else {
pRead->cond.scanMeta = 0; pRead->cond.scanMeta = 0;
pRead->cond.scanUncommited = 0; pRead->cond.scanUncommited = 0;
pRead->cond.enableRef = 0;
} }
taosThreadMutexInit(&pRead->mutex, NULL); taosThreadMutexInit(&pRead->mutex, NULL);
/*if (pRead->cond.enableRef) {*/
/*walOpenRef(pWal);*/
/*}*/
pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead)); pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
if (pRead->pHead == NULL) { if (pRead->pHead == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
...@@ -151,24 +156,8 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) { ...@@ -151,24 +156,8 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
return 0; return 0;
} }
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) {
SWal *pWal = pRead->pWal; SWal *pWal = pRead->pWal;
if (!pRead->curInvalid && ver == pRead->curVersion) {
wDebug("wal version %ld match, no need to reset", ver);
return 0;
}
pRead->curInvalid = 1;
pRead->curVersion = ver;
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
ver, pWal->vers.firstVer, pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
if (ver < pWal->vers.snapshotVer) {
}
SWalFileInfo tmpInfo; SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver; tmpInfo.firstVer = ver;
...@@ -190,6 +179,31 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { ...@@ -190,6 +179,31 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver); wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver);
pRead->curVersion = ver; pRead->curVersion = ver;
return 0;
}
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
SWal *pWal = pRead->pWal;
if (!pRead->curInvalid && ver == pRead->curVersion) {
wDebug("wal version %ld match, no need to reset", ver);
return 0;
}
pRead->curInvalid = 1;
pRead->curVersion = ver;
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
ver, pWal->vers.firstVer, pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
if (ver < pWal->vers.snapshotVer) {
}
if (walReadSeekVerImpl(pRead, ver) < 0) {
return -1;
}
return 0; return 0;
} }
...@@ -198,6 +212,8 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity ...@@ -198,6 +212,8 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
int64_t contLen; int64_t contLen;
bool seeked = false;
if (pRead->curInvalid || pRead->curVersion != fetchVer) { if (pRead->curInvalid || pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) { if (walReadSeekVer(pRead, fetchVer) < 0) {
ASSERT(0); ASSERT(0);
...@@ -205,9 +221,17 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -205,9 +221,17 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
pRead->curInvalid = 1; pRead->curInvalid = 1;
return -1; return -1;
} }
seeked = true;
} }
while (1) {
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen != sizeof(SWalCkHead)) { if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pRead, fetchVer);
seeked = true;
continue;
} else {
if (contLen < 0) { if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
} else { } else {
...@@ -217,6 +241,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -217,6 +241,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
pRead->curInvalid = 1; pRead->curInvalid = 1;
return -1; return -1;
} }
}
return 0; return 0;
} }
...@@ -379,20 +404,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -379,20 +404,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
} }
int32_t walReadVer(SWalReader *pRead, int64_t ver) { int32_t walReadVer(SWalReader *pRead, int64_t ver) {
int64_t code; int64_t contLen;
bool seeked = false;
if (pRead->pWal->vers.firstVer == -1) { if (pRead->pWal->vers.firstVer == -1) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
} }
if (pRead->curInvalid || pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
return -1;
}
}
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) { if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer); ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
...@@ -400,21 +419,35 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -400,21 +419,35 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
return -1; return -1;
} }
ASSERT(taosValidFile(pRead->pLogFile) == true); if (pRead->curInvalid || pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
return -1;
}
seeked = true;
}
code = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); while (1) {
if (code != sizeof(SWalCkHead)) { contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (code < 0) if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pRead, ver);
seeked = true;
continue;
} else {
if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
} }
ASSERT(0);
return -1; return -1;
} }
}
code = walValidHeadCksum(pRead->pHead); contLen = walValidHeadCksum(pRead->pHead);
if (code != 0) { if (contLen != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
...@@ -430,9 +463,9 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -430,9 +463,9 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
pRead->capacity = pRead->pHead->head.bodyLen; pRead->capacity = pRead->pHead->head.bodyLen;
} }
if ((code = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) != if ((contLen = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
pRead->pHead->head.bodyLen) { pRead->pHead->head.bodyLen) {
if (code < 0) if (contLen < 0)
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
else { else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
...@@ -449,8 +482,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -449,8 +482,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
return -1; return -1;
} }
code = walValidBodyCksum(pRead->pHead); contLen = walValidBodyCksum(pRead->pHead);
if (code != 0) { if (contLen != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册