提交 6fcee0e2 编写于 作者: L Liu Jicong

refactor(tsdb): tsdb reader open

上级 73b8f02a
......@@ -116,6 +116,8 @@ typedef struct SQueryTableDataCond {
int32_t type; // data block load type:
int32_t numOfTWindows;
STimeWindow* twindows;
int32_t startVersion;
int32_t endVersion;
} SQueryTableDataCond;
void* blockDataDestroy(SSDataBlock* pBlock);
......
......@@ -484,6 +484,7 @@ typedef struct {
int64_t stbUid;
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
SArray* unassignedVgs; // SArray<SMqVgEp*>
char dbName[TSDB_DB_FNAME_LEN];
} SMqSubscribeObj;
SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
......
......@@ -35,7 +35,7 @@
#define MND_CONSUMER_LOST_HB_CNT 3
static int8_t mqRebLock = 0;
static int8_t mqRebInExecCnt = 0;
static const char *mndConsumerStatusName(int status);
......@@ -76,15 +76,15 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {}
bool mndRebTryStart() {
int8_t old = atomic_val_compare_exchange_8(&mqRebLock, 0, 1);
int8_t old = atomic_val_compare_exchange_8(&mqRebInExecCnt, 0, 1);
return old == 0;
}
void mndRebEnd() { atomic_sub_fetch_8(&mqRebLock, 1); }
void mndRebEnd() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
void mndRebCntInc() { atomic_add_fetch_8(&mqRebLock, 1); }
void mndRebCntInc() { atomic_add_fetch_8(&mqRebInExecCnt, 1); }
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebLock, 1); }
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
......@@ -92,7 +92,6 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
ASSERT(pConsumer);
mInfo("receive consumer lost msg, consumer id %ld, status %s", pLostMsg->consumerId,
mndConsumerStatusName(pConsumer->status));
......@@ -106,7 +105,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg);
if (pTrans == NULL) goto FAIL;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
......@@ -125,6 +124,14 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
ASSERT(pConsumer);
mInfo("receive consumer recover msg, consumer id %ld, status %s", pRecoverMsg->consumerId,
mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER;
......@@ -844,10 +851,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
topicSz = 1;
}
if (numOfRows + topicSz > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
}
for (int32_t i = 0; i < topicSz; i++) {
if (numOfRows + topicSz > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
}
SColumnInfoData *pColInfo;
int32_t cols = 0;
......
......@@ -415,6 +415,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
}
pSubNew->unassignedVgs = taosArrayDeepCopy(pSub->unassignedVgs, (FCopy)tCloneSMqVgEp);
memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
return pSubNew;
}
......@@ -445,6 +446,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
}
ASSERT(cnt == sz);
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
tlen += taosEncodeString(buf, pSub->dbName);
return tlen;
}
......@@ -467,6 +469,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
}
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
buf = taosDecodeStringTo(buf, pSub->dbName);
return (void *)buf;
}
......
......@@ -402,7 +402,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg);
mndTransSetDbName(pTrans, pOutput->pSub->dbName);
if (pTrans == NULL) return -1;
// make txn:
......@@ -547,6 +548,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
taosRLockLatch(&pTopic->lock);
rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
ASSERT(taosHashGetSize(rebOutput.pSub->consumerHash) == 0);
taosRUnLockLatch(&pTopic->lock);
......
......@@ -87,30 +87,22 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId
SNode *pAst = NULL;
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
ASSERT(0);
return false;
return -1;
}
SHashObj *pColHash = NULL;
SNodeList *pNodeList = NULL;
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
SNode *pNode = NULL;
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableId != suid) goto NEXT;
if (pColHash == NULL) {
pColHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
}
if (pCol->colId > 0) {
taosHashPut(pColHash, &pCol->colId, sizeof(int16_t), NULL, 0);
if (pCol->colId > 0 && pCol->colId == colId) {
found = true;
goto NEXT;
}
mTrace("topic:%s, colId:%d is used", pTopic->name, pCol->colId);
}
if (taosHashGet(pColHash, &colId, sizeof(int16_t)) != NULL) {
found = true;
goto NEXT;
}
NEXT:
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
......@@ -563,7 +555,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer %ld from cgroup %s", dropReq.name,
mError("topic:%s, failed to drop since subscribed by consumer %ld in consumer group %s", dropReq.name,
pConsumer->consumerId, pConsumer->cgroup);
return -1;
}
......@@ -580,7 +572,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
#endif
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
mndTransSetDbName(pTrans, pTopic->db);
if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
return -1;
......
......@@ -89,7 +89,7 @@ typedef struct SMetaFltParam {
tb_uid_t suid;
int16_t cid;
int16_t type;
char * val;
char *val;
bool reverse;
int (*filterFunc)(void *a, void *b, int16_t type);
......@@ -116,16 +116,16 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
tsdbReaderT *tsdbQueryTables(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId);
tsdbReaderT *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
void *pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
void * tsdbGetIdx(SMeta *pMeta);
void * tsdbGetIvtIdx(SMeta *pMeta);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
......@@ -201,7 +201,7 @@ struct SMetaEntry {
int64_t version;
int8_t type;
tb_uid_t uid;
char * name;
char *name;
union {
struct {
SSchemaWrapper schemaRow;
......@@ -229,17 +229,17 @@ struct SMetaEntry {
struct SMetaReader {
int32_t flags;
SMeta * pMeta;
SMeta *pMeta;
SDecoder coder;
SMetaEntry me;
void * pBuf;
void *pBuf;
int32_t szBuf;
};
struct SMTbCursor {
TBC * pDbc;
void * pKey;
void * pVal;
TBC *pDbc;
void *pKey;
void *pVal;
int32_t kLen;
int32_t vLen;
SMetaReader mr;
......
......@@ -119,8 +119,8 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId);
tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef);
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
......
......@@ -500,8 +500,8 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle
return TSDB_CODE_SUCCESS;
}
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId) {
tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId) {
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
if (pTsdbReadHandle == NULL) {
return NULL;
......@@ -642,7 +642,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL
return NULL;
}
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pList, qId, taskId);
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId);
if (pTsdbReadHandle == NULL) {
return NULL;
}
......
......@@ -2010,9 +2010,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
if (pCtx[j].increase) {
int64_t ts = *(int64_t*) in;
int64_t ts = *(int64_t*)in;
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, (const char *)&ts, pCtx[j].resultInfo->isNullRes);
colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes);
ts++;
}
} else {
......@@ -3112,8 +3112,8 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
}
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
SSortedMergeOperatorInfo *pInfo) {
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) {
blockDataCleanup(pDataBlock);
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
......@@ -4575,7 +4575,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp twSup = {
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
......@@ -4685,8 +4685,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
pOptr =
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 8;
......@@ -4720,7 +4721,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t numOfOutputCols = 0;
SArray* pColList =
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputDataBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pInputDataBlock, pResBlock, sortInfo, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
......@@ -5067,7 +5068,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
goto _error;
}
return tsdbQueryTables(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
return tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
_error:
terrno = code;
......@@ -5349,7 +5350,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS;
}
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size) {
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
size_t size) {
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
......@@ -5370,7 +5372,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
bufSize = pageSize * 4;
}
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
for(int32_t i = 0; i < numOfOutput; ++i) {
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].pBuf = pSup->pResultBuf;
}
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册