diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 6204b9dcfca0be850d036f0eeb1687f7ba6148bb..2a0d4e7ff69e79b10f7c0f0d47aa818f918c8a38 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -71,8 +71,8 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); #define colDataGetData(p1_, r_) \ ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_)) -#define IS_JSON_NULL(type, data) ((type) == TSDB_DATA_TYPE_JSON && \ - (*(data) == TSDB_DATA_TYPE_NULL || tTagIsJsonNull(data))) +#define IS_JSON_NULL(type, data) \ + ((type) == TSDB_DATA_TYPE_JSON && (*(data) == TSDB_DATA_TYPE_NULL || tTagIsJsonNull(data))) static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) { if (!pColumnInfoData->hasNull) { @@ -186,7 +186,8 @@ int32_t getJsonValueLen(const char* data); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, uint32_t* capacity, const SColumnInfoData* pSource, uint32_t numOfRow2); -int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo); +int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, + const SDataBlockInfo* pBlockInfo); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); @@ -222,6 +223,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); +int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); SSDataBlock* createDataBlock(); int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 2509c106015682ff38c5de3d1965fa7b97d0d7a7..70056783ea18d337b121fd21bedc2294bbbed0e7 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -73,10 +73,12 @@ void mndStop(SMnode *pMnode); * @param pMnode The mnode object. * @param pCluster * @param pVgroup + * @param pStbInfo * @param pGrant * @return int32_t 0 for success, -1 for failure. */ -int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pCluster, SMonVgroupInfo *pVgroup, SMonGrantInfo *pGrant); +int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, + SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo); /** * @brief Get mnode loads for status msg. diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 6738fc23bc474270119a78fa67c4c777c8c696cc..00acc4741d78af3bebece1c04ed791e02b2e9fe9 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -36,7 +36,7 @@ typedef struct SReadHandle { void* vnode; void* mnd; SMsgCb* pMsgCb; - int8_t initTsdbReader; +// int8_t initTsdbReader; } SReadHandle; enum { diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 9e7aa4e27ff209eafaf7eea4f70e6bb085714a3e..41d80c1a83a1b01eae5bae71e150b33569706556 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -138,6 +138,15 @@ typedef struct { SArray *vgroups; // array of SMonVgroupDesc } SMonVgroupInfo; +typedef struct { + char stb_name[TSDB_TABLE_NAME_LEN]; + char database_name[TSDB_DB_NAME_LEN]; +} SMonStbDesc; + +typedef struct { + SArray *stbs; // array of SMonStbDesc +} SMonStbInfo; + typedef struct { int32_t expire_time; int64_t timeseries_used; @@ -147,6 +156,7 @@ typedef struct { typedef struct { SMonClusterInfo cluster; SMonVgroupInfo vgroup; + SMonStbInfo stb; SMonGrantInfo grant; SMonSysInfo sys; SMonLogs log; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7673e730356fcb75cfa38f41521b5c41d596d6e6..a47810e7c1a918ef4ce9795fdbd08bf45ce363ce 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -344,10 +344,12 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc if (pTask->sinkType == TASK_SINK__TABLE) { ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); + taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock); taosFreeQitem(pBlock); } else if (pTask->sinkType == TASK_SINK__SMA) { ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); + taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock); taosFreeQitem(pBlock); } else { ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a0c07f0c09fde9236fd7d5cf65b2596727261414..2bf49fa006e4530088f411ff817e105010888aee 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -157,13 +157,13 @@ typedef struct SSyncLogStore { SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore); // refactor, log[0 .. n] ==> log[m .. n] - int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex); - int32_t (*syncLogResetBeginIndex)(struct SSyncLogStore* pLogStore); + // int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex); + SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore); bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore); int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore); - // bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index); + int32_t (*syncLogRestoreFromSnapshot)(struct SSyncLogStore* pLogStore, SyncIndex index); SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8317c6c79daedd59a99652b638ce40e64f4ebdaf..593f8c5c0bb55975d29957c2ea80349c152ac4fd 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -263,7 +263,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, ui pColumnInfoData->varmeta.length = len + oldLen; } else { if (finalNumOfRows > *capacity) { - ASSERT(finalNumOfRows*pColumnInfoData->info.bytes); + ASSERT(finalNumOfRows * pColumnInfoData->info.bytes); char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes); if (tmp == NULL) { return TSDB_CODE_VND_OUT_OF_MEMORY; @@ -293,7 +293,8 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, ui return numOfRow1 + numOfRow2; } -int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo) { +int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, + const SDataBlockInfo* pBlockInfo) { ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type); if (numOfRows <= 0) { return numOfRows; @@ -327,9 +328,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p return 0; } -size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { - return taosArrayGetSize(pBlock->pDataBlock); -} +size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); } size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } @@ -396,7 +395,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd int32_t pageSize) { ASSERT(pBlock != NULL && stopIndex != NULL); - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfRows = pBlock->info.rows; int32_t bitmapChar = 1; @@ -512,7 +511,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { // write the number of rows *(uint32_t*)buf = pBlock->info.rows; - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfRows = pBlock->info.rows; char* pStart = buf + sizeof(uint32_t); @@ -542,7 +541,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { pBlock->info.rows = *(int32_t*)buf; - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); const char* pStart = buf + sizeof(uint32_t); for (int32_t i = 0; i < numOfCols; ++i) { @@ -734,7 +733,7 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, int32_t tupleIndex) { int32_t code = 0; - size_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = &pDstCols[i]; @@ -794,7 +793,7 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { int32_t rows = pDataBlock->info.rows; - size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData)); if (pCols == NULL) { @@ -902,7 +901,6 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { } else { // var data type } } else if (numOfCols == 2) { - } } @@ -1103,14 +1101,14 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { } } -static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo *pBlockInfo, uint32_t numOfRows) { +static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows) { ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows); if (numOfRows < pBlockInfo->capacity) { return TSDB_CODE_SUCCESS; } // todo temp disable it -// ASSERT(pColumn->info.bytes != 0); + // ASSERT(pColumn->info.bytes != 0); int32_t existedRows = pBlockInfo->rows; @@ -1141,7 +1139,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo * if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - + memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows)); pColumn->pData = tmp; } @@ -1197,6 +1195,40 @@ void* blockDataDestroy(SSDataBlock* pBlock) { taosMemoryFreeClear(pBlock); return NULL; } +int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { + ASSERT(src != NULL); + + dst->info = src->info; + dst->info.rows = 0; + dst->info.capacity = 0; + + size_t numOfCols = taosArrayGetSize(src->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(src->pDataBlock, i); + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; + blockDataAppendColInfo(dst, &colInfo); + } + + int32_t code = blockDataEnsureCapacity(dst, src->info.rows); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i); + if (pSrc->pData == NULL) { + continue; + } + + colDataAssign(pDst, pSrc, src->info.rows, &src->info); + } + + dst->info.rows = src->info.rows; + dst->info.capacity = src->info.rows; + return 0; +} SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { if (pDataBlock == NULL) { @@ -1272,7 +1304,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat } // todo disable it temporarily -// ASSERT(pColInfoData->info.type != 0); + // ASSERT(pColInfoData->info.type != 0); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { pBlock->info.hasVarCol = true; } @@ -1284,7 +1316,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) { SColumnInfoData col = {.hasNull = true}; col.info.colId = colId; - col.info.type = type; + col.info.type = type; col.info.bytes = bytes; return col; @@ -1552,9 +1584,9 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) { int32_t sz = taosArrayGetSize(dataBlocks); for (int32_t i = 0; i < sz; i++) { SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i); - size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); - int32_t rows = pDataBlock->info.rows; + int32_t rows = pDataBlock->info.rows; printf("%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId); for (int32_t j = 0; j < rows; j++) { printf("%s |", flag); @@ -1633,8 +1665,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; -// int32_t rowSize = pDataBlock->info.rowSize; -// int64_t groupId = pDataBlock->info.groupId; + // int32_t rowSize = pDataBlock->info.rowSize; + // int64_t groupId = pDataBlock->info.groupId; if (colNum <= 1) { // invalid if only with TS col diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index d44a7d79bfc98d765065fc6bfa19facac8e1d9ae..f9c54674f377d3f9dc2374334551346cb67d6d7c 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -17,7 +17,7 @@ #include "mmInt.h" void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) { - mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->grant); + mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->stb, &pInfo->grant); } int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 1d19bbd899d8d247c2e24374adf86a9bd6b062df..0213fcb5bd006473404366c67b805615caad7148 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -614,7 +614,7 @@ int64_t mndGenerateUid(char *name, int32_t len) { } int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo) { + SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) { if (mndAcquireRpcRef(pMnode) != 0) return -1; SSdb *pSdb = pMnode->pSdb; @@ -623,7 +623,9 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc)); pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc)); pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc)); - if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) { + pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc)); + if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL || + pStbInfo->stbs == NULL) { mndReleaseRpcRef(pMnode); return -1; } @@ -714,6 +716,27 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr sdbRelease(pSdb, pVgroup); } + // stb info + pIter = NULL; + while (1) { + SStbObj *pStb = NULL; + pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb); + if (pIter == NULL) break; + + SMonStbDesc desc = {0}; + + SName name1 = {0}; + tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + tNameGetDbName(&name1, desc.database_name); + + SName name2 = {0}; + tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN); + + taosArrayPush(pStbInfo->stbs, &desc); + sdbRelease(pSdb, pStb); + } + // grant info pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f; pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index cca3a5858814e54d32fb8965326b3786e8f3f642..ee640eb640fa08424d58eca1408a1bb2efaaedea 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -403,7 +403,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .reader = pHandle->execHandle.pExecReader[i], .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, - .initTsdbReader = 1, +// .initTsdbReader = 1, }; pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); ASSERT(pHandle->execHandle.execCol.task[i]); @@ -476,7 +476,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { .reader = pStreamReader, .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, - .initTsdbReader = 1, +// .initTsdbReader = 1, }; /*pTask->exec.inputHandle = pStreamReader;*/ pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index da052ee3eb7723ab3d23ffc348d09384801ed578..be8fef1249cae441fb267837e7729f3e7b87c7d7 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -66,6 +66,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp if (qStreamScanSnapshot(task) < 0) { ASSERT(0); } + // set version while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index e5475d7c300531b7b3e0d8cce369fa9eb0415930..e536e87032af55b529477f6b2051db07677b6902 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -44,6 +44,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { } char* fname = buildFileName(pStore->pTq->path); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); + taosMemoryFree(fname); if (pFile != NULL) { STqOffsetHead head = {0}; int64_t code; @@ -77,7 +78,6 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { } taosCloseFile(&pFile); - taosMemoryFree(fname); } return pStore; } @@ -102,6 +102,7 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { // TODO file name should be with a version char* fname = buildFileName(pStore->pTq->path); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + taosMemoryFree(fname); if (pFile == NULL) { ASSERT(0); return -1; @@ -140,6 +141,5 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { } // close and rename file taosCloseFile(&pFile); - taosMemoryFree(fname); return 0; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5e5a4883a9d91f63601dca9db9da7252563a3164..5d2fb81cf0669e07f703fc0006416353ada7bc2d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -151,6 +151,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ int32_t sversion = htonl(pHandle->pBlock->sversion); if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion || pHandle->cachedSchemaSuid != pHandle->msgIter.suid) { + if (pHandle->pSchema) taosMemoryFree(pHandle->pSchema); pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); if (pHandle->pSchema == NULL) { tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table", @@ -161,6 +162,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ } // this interface use suid instead of uid + if (pHandle->pSchemaWrapper) tDeleteSSchemaWrapper(pHandle->pSchemaWrapper); pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion, true); if (pHandle->pSchemaWrapper == NULL) { tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table", @@ -184,7 +186,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ while (colMeta < pSchemaWrapper->nCols) { SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta]; SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); - int32_t code = blockDataAppendColInfo(pBlock, &colInfo); + int32_t code = blockDataAppendColInfo(pBlock, &colInfo); if (code != TSDB_CODE_SUCCESS) { goto FAIL; } @@ -207,7 +209,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ colNeed++; } else { SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); - int32_t code = blockDataAppendColInfo(pBlock, &colInfo); + int32_t code = blockDataAppendColInfo(pBlock, &colInfo); if (code != TSDB_CODE_SUCCESS) { goto FAIL; } @@ -251,8 +253,8 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ } return 0; -FAIL: // todo refactor here -// if (*ppCols) taosArrayDestroy(*ppCols); +FAIL: // todo refactor here + // if (*ppCols) taosArrayDestroy(*ppCols); return -1; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1e5d75655ce1ff5927ef13a363f7e9518a9898d4..a3170cd748fc648212331c111e900c7827733a35 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1341,7 +1341,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR } if (rowRes != NULL) { - int32_t totalRows = pBlock->info.rows; + int32_t totalRows = pBlock->info.rows; SSDataBlock* px = createOneDataBlock(pBlock, true); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -4041,6 +4041,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); if(code){ + pTaskInfo->code = code; return NULL; } code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); @@ -4053,6 +4054,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; return pOperator; + } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; pTableListInfo->needSortTableByGroupId = true; @@ -4067,11 +4069,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId); + STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; return pOperator; + } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STimeWindowAggSupp twSup = { @@ -4085,9 +4090,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId); return pOperator; + } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; @@ -4098,6 +4105,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); @@ -4486,18 +4494,14 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead (*pTaskInfo)->sql = sql; (*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond; (*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond; - (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, - &(*pTaskInfo)->tableqinfoList); + (*pTaskInfo)->pRoot = + createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList); + if (NULL == (*pTaskInfo)->pRoot) { code = (*pTaskInfo)->code; goto _complete; } - if ((*pTaskInfo)->pRoot == NULL) { - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _complete; - } - return code; _complete: diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ccd7bf04aaf5aadbbbc6e41e327b6a2efa72a4de..9c0ed40c3066a1b5c4f165becc376c02f5aee3d7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1012,9 +1012,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { while (tqNextDataBlock(pInfo->streamBlockReader)) { SSDataBlock block = {0}; - uint64_t groupId = 0; - uint64_t uid = 0; - int32_t numOfRows = 0; + uint64_t groupId = 0; + uint64_t uid = 0; + int32_t numOfRows = 0; // todo refactor int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader, &groupId, &uid, &numOfRows); @@ -1067,6 +1067,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } } + // TODO refactor @liao + taosArrayDestroy(block.pDataBlock); + if (pInfo->pRes->pDataBlock == NULL) { // TODO add log pOperator->status = OP_EXEC_DONE; @@ -1106,12 +1109,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; + } else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) { SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp); - if (pResult) { - return pResult->info.rows > 0 ? pResult : NULL; - } - return NULL; + return pResult && pResult->info.rows > 0 ? pResult : NULL; + } else { ASSERT(0); return NULL; @@ -1387,7 +1389,8 @@ static SSDataBlock* buildSysTableMetaBlock() { SSDataBlock* pBlock = createDataBlock(); for (int32_t i = 0; i < pMeta[index].colNum; ++i) { - SColumnInfoData colInfoData = createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1); + SColumnInfoData colInfoData = + createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1); blockDataAppendColInfo(pBlock, &colInfoData); } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index f985cc324d809cd0937e2d886dfff55c1f29b2e3..69fff2ad1811077fcc15ddec52f3d47499c699b2 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -79,7 +79,8 @@ static int32_t translateIn2NumOutDou(SFunctionNode* pFunc, char* pErrBuf, int32_ uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; - if (!IS_NUMERIC_TYPE(para1Type) || !IS_NUMERIC_TYPE(para2Type)) { + if ((!IS_NUMERIC_TYPE(para1Type) && !IS_NULL_TYPE(para1Type)) || + (!IS_NUMERIC_TYPE(para2Type) && !IS_NULL_TYPE(para2Type))) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } @@ -109,13 +110,13 @@ static int32_t translateLogarithm(SFunctionNode* pFunc, char* pErrBuf, int32_t l } uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - if (!IS_NUMERIC_TYPE(para1Type)) { + if (!IS_NUMERIC_TYPE(para1Type) && !IS_NULL_TYPE(para1Type)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } if (2 == numOfParams) { uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; - if (!IS_NUMERIC_TYPE(para2Type)) { + if (!IS_NUMERIC_TYPE(para2Type) && !IS_NULL_TYPE(para2Type)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } } diff --git a/source/libs/monitor/src/monMain.c b/source/libs/monitor/src/monMain.c index 4d094e59314b726588204056a738175e7441e8a0..d387a432e13d12a55a03af089aab2228f6863ab0 100644 --- a/source/libs/monitor/src/monMain.c +++ b/source/libs/monitor/src/monMain.c @@ -280,6 +280,27 @@ static void monGenVgroupJson(SMonInfo *pMonitor) { } } +static void monGenStbJson(SMonInfo *pMonitor) { + SMonStbInfo *pInfo = &pMonitor->mmInfo.stb; + if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return; + + SJson *pJson = tjsonAddArrayToObject(pMonitor->pJson, "stb_infos"); + if (pJson == NULL) return; + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->stbs); ++i) { + SJson *pStbJson = tjsonCreateObject(); + if (pStbJson == NULL) continue; + if (tjsonAddItemToArray(pJson, pStbJson) != 0) { + tjsonDelete(pStbJson); + continue; + } + + SMonStbDesc *pStbDesc = taosArrayGet(pInfo->stbs, i); + tjsonAddStringToObject(pStbJson, "stb_name", pStbDesc->stb_name); + tjsonAddStringToObject(pStbJson, "database_name", pStbDesc->database_name); + } +} + static void monGenGrantJson(SMonInfo *pMonitor) { SMonGrantInfo *pInfo = &pMonitor->mmInfo.grant; if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return; @@ -527,6 +548,7 @@ void monSendReport() { monGenBasicJson(pMonitor); monGenClusterJson(pMonitor); monGenVgroupJson(pMonitor); + monGenStbJson(pMonitor); monGenGrantJson(pMonitor); monGenDnodeJson(pMonitor); monGenDiskJson(pMonitor); diff --git a/source/libs/monitor/src/monMsg.c b/source/libs/monitor/src/monMsg.c index ced493d5b74774aa4c2b79ba0b2224d3faee7406..8fa7e8860509ca473ad41b1c89efbf430f0c2649 100644 --- a/source/libs/monitor/src/monMsg.c +++ b/source/libs/monitor/src/monMsg.c @@ -209,6 +209,32 @@ int32_t tDecodeSMonVgroupInfo(SDecoder *decoder, SMonVgroupInfo *pInfo) { return 0; } +int32_t tEncodeSMonStbInfo(SEncoder *encoder, const SMonStbInfo *pInfo) { + if (tEncodeI32(encoder, taosArrayGetSize(pInfo->stbs)) < 0) return -1; + for (int32_t i = 0; i < taosArrayGetSize(pInfo->stbs); ++i) { + SMonStbDesc *pDesc = taosArrayGet(pInfo->stbs, i); + if (tEncodeCStr(encoder, pDesc->stb_name) < 0) return -1; + if (tEncodeCStr(encoder, pDesc->database_name) < 0) return -1; + } + return 0; +} + +int32_t tDecodeSMonStbInfo(SDecoder *decoder, SMonStbInfo *pInfo) { + int32_t arraySize = 0; + if (tDecodeI32(decoder, &arraySize) < 0) return -1; + + pInfo->stbs = taosArrayInit(arraySize, sizeof(SMonStbDesc)); + if (pInfo->stbs == NULL) return -1; + + for (int32_t i = 0; i < arraySize; ++i) { + SMonStbDesc desc = {0}; + if (tDecodeCStrTo(decoder, desc.stb_name) < 0) return -1; + if (tDecodeCStrTo(decoder, desc.database_name) < 0) return -1; + taosArrayPush(pInfo->stbs, &desc); + } + return 0; +} + int32_t tEncodeSMonGrantInfo(SEncoder *encoder, const SMonGrantInfo *pInfo) { if (tEncodeI32(encoder, pInfo->expire_time) < 0) return -1; if (tEncodeI64(encoder, pInfo->timeseries_used) < 0) return -1; @@ -230,6 +256,7 @@ int32_t tSerializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) { if (tStartEncode(&encoder) < 0) return -1; if (tEncodeSMonClusterInfo(&encoder, &pInfo->cluster) < 0) return -1; if (tEncodeSMonVgroupInfo(&encoder, &pInfo->vgroup) < 0) return -1; + if (tEncodeSMonStbInfo(&encoder, &pInfo->stb) < 0) return -1; if (tEncodeSMonGrantInfo(&encoder, &pInfo->grant) < 0) return -1; if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1; if (tEncodeSMonLogs(&encoder, &pInfo->log) < 0) return -1; @@ -247,6 +274,7 @@ int32_t tDeserializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) { if (tStartDecode(&decoder) < 0) return -1; if (tDecodeSMonClusterInfo(&decoder, &pInfo->cluster) < 0) return -1; if (tDecodeSMonVgroupInfo(&decoder, &pInfo->vgroup) < 0) return -1; + if (tDecodeSMonStbInfo(&decoder, &pInfo->stb) < 0) return -1; if (tDecodeSMonGrantInfo(&decoder, &pInfo->grant) < 0) return -1; if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1; if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1; @@ -261,9 +289,11 @@ void tFreeSMonMmInfo(SMonMmInfo *pInfo) { taosArrayDestroy(pInfo->cluster.mnodes); taosArrayDestroy(pInfo->cluster.dnodes); taosArrayDestroy(pInfo->vgroup.vgroups); + taosArrayDestroy(pInfo->stb.stbs); pInfo->cluster.mnodes = NULL; pInfo->cluster.dnodes = NULL; pInfo->vgroup.vgroups = NULL; + pInfo->stb.stbs = NULL; pInfo->log.logs = NULL; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index c2551392b3e2c7f2425afbb6c3b70b62b49c8744..f78bb42e0eb022bb66c93048c1682f4dd59e5161 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1061,6 +1061,7 @@ static EDealRes partTagsOptRebuildTbanmeImpl(SNode** pNode, void* pContext) { } strcpy(pFunc->functionName, "tbname"); pFunc->funcType = FUNCTION_TYPE_TBNAME; + pFunc->node.resType = ((SColumnNode*)*pNode)->node.resType; nodesDestroyNode(*pNode); *pNode = (SNode*)pFunc; return DEAL_RES_IGNORE_CHILD; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 7ea0457592824b7f26fe823f8f016fd98a90d4f7..d7f0733129b37144c4661fd612e6769d2a232397 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -36,9 +36,6 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu SColumnInfoData *pOutputData = pOutput->columnData; int32_t type = GET_PARAM_TYPE(pInput); - if (!IS_NUMERIC_TYPE(type)) { - return TSDB_CODE_FAILED; - } switch (type) { case TSDB_DATA_TYPE_FLOAT: { @@ -119,6 +116,13 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu break; } + case TSDB_DATA_TYPE_NULL: { + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + colDataAppendNULL(pOutputData, i); + } + break; + } + default: { colDataAssign(pOutputData, pInputData, pInput->numOfRows, NULL); } @@ -130,9 +134,6 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) { int32_t type = GET_PARAM_TYPE(pInput); - if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) { - return TSDB_CODE_FAILED; - } SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; @@ -142,7 +143,7 @@ static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SS double *out = (double *)pOutputData->pData; for (int32_t i = 0; i < pInput->numOfRows; ++i) { - if (colDataIsNull_s(pInputData, i)) { + if (colDataIsNull_s(pInputData, i) || IS_NULL_TYPE(type)) { colDataAppendNULL(pOutputData, i); continue; } @@ -159,10 +160,6 @@ static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SS } static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) { - if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) { - return TSDB_CODE_FAILED; - } - SColumnInfoData *pInputData[2]; SColumnInfoData *pOutputData = pOutput->columnData; _getDoubleValue_fn_t getValueFn[2]; @@ -175,11 +172,15 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S double *out = (double *)pOutputData->pData; double result; + bool hasNullType = (IS_NULL_TYPE(GET_PARAM_TYPE(&pInput[0])) || + IS_NULL_TYPE(GET_PARAM_TYPE(&pInput[1]))); + int32_t numOfRows = TMAX(pInput[0].numOfRows, pInput[1].numOfRows); if (pInput[0].numOfRows == pInput[1].numOfRows) { for (int32_t i = 0; i < numOfRows; ++i) { if (colDataIsNull_s(pInputData[0], i) || - colDataIsNull_s(pInputData[1], i)) { + colDataIsNull_s(pInputData[1], i) || + hasNullType) { colDataAppendNULL(pOutputData, i); continue; } @@ -191,7 +192,7 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S } } } else if (pInput[0].numOfRows == 1) { //left operand is constant - if (colDataIsNull_s(pInputData[0], 0)) { + if (colDataIsNull_s(pInputData[0], 0) || hasNullType) { colDataAppendNNULL(pOutputData, 0, pInput[1].numOfRows); } else { for (int32_t i = 0; i < numOfRows; ++i) { @@ -210,7 +211,7 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S } } } else if (pInput[1].numOfRows == 1) { - if (colDataIsNull_s(pInputData[1], 0)) { + if (colDataIsNull_s(pInputData[1], 0) || hasNullType) { colDataAppendNNULL(pOutputData, 0, pInput[0].numOfRows); } else { for (int32_t i = 0; i < numOfRows; ++i) { @@ -236,9 +237,6 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) { int32_t type = GET_PARAM_TYPE(pInput); - if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) { - return TSDB_CODE_FAILED; - } SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; @@ -272,6 +270,13 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP break; } + case TSDB_DATA_TYPE_NULL: { + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + colDataAppendNULL(pOutputData, i); + } + break; + } + default: { colDataAssign(pOutputData, pInputData, pInput->numOfRows, NULL); } diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 9654e21d24a3768a69d8452715fd56fac73b2f2d..2f41c083549d0cc6f060fe888df5fd697099aa95 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -34,7 +34,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); -int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet); +int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet); int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b5a9e221a74195e4799efab35ae3d2e52bf09efa..11cd089606a2ff33d256ed6916524ef6f4dd0541 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -172,7 +172,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis pRetrieve->streamBlockType = pBlock->info.type; pRetrieve->numOfRows = htonl(pBlock->info.rows); - int32_t numOfCols = (int32_t) taosArrayGetSize(pBlock->pDataBlock); + int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); pRetrieve->numOfCols = htonl(numOfCols); int32_t actualLen = 0; @@ -185,7 +185,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis return 0; } -int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { +int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { void* buf = NULL; int32_t code = -1; int32_t blockNum = taosArrayGetSize(data->blocks); @@ -307,6 +307,8 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); return -1; } + taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock); + taosFreeQitem(pBlock); tmsgSendReq(pEpSet, &dispatchMsg); return 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6fada59c84e38392db20e6862bb847468de46eb2..cb7dd241c1912d0f5e2dca88be9562e3fca095e3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -53,9 +53,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) } // TODO: do we need free memory? - SSDataBlock* outputCopy = createOneDataBlock(output, true); - outputCopy->info.childId = pTask->selfChildId; - taosArrayPush(pRes, outputCopy); + SSDataBlock block = {0}; + assignOneDataBlock(&block, output); + block.info.childId = pTask->selfChildId; + taosArrayPush(pRes, &block); + /*SSDataBlock* outputCopy = createOneDataBlock(output, true);*/ + /*outputCopy->info.childId = pTask->selfChildId;*/ + /*taosArrayPush(pRes, outputCopy);*/ } return 0; } @@ -68,6 +72,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { streamTaskExecImpl(pTask, data, pRes); if (pTask->taskStatus == TASK_STATUS__DROPPING) { + taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); return NULL; } @@ -82,25 +87,25 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { qRes->blocks = pRes; if (streamTaskOutput(pTask, qRes) < 0) { streamQueueProcessFail(pTask->inputQueue); - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); taosFreeQitem(qRes); return NULL; } - - int8_t type = ((SStreamQueueItem*)data)->type; - if (type == STREAM_INPUT__TRIGGER) { - blockDataDestroy(((SStreamTrigger*)data)->pBlock); - taosFreeQitem(data); - } else if (type == STREAM_INPUT__DATA_BLOCK) { - taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); - taosFreeQitem(data); - } else if (type == STREAM_INPUT__DATA_SUBMIT) { - ASSERT(pTask->isDataScan); - streamDataSubmitRefDec((SStreamDataSubmit*)data); - taosFreeQitem(data); - } streamQueueProcessSuccess(pTask->inputQueue); - return taosArrayInit(0, sizeof(SSDataBlock)); + pRes = taosArrayInit(0, sizeof(SSDataBlock)); + } + + int8_t type = ((SStreamQueueItem*)data)->type; + if (type == STREAM_INPUT__TRIGGER) { + blockDataDestroy(((SStreamTrigger*)data)->pBlock); + taosFreeQitem(data); + } else if (type == STREAM_INPUT__DATA_BLOCK) { + taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); + taosFreeQitem(data); + } else if (type == STREAM_INPUT__DATA_SUBMIT) { + ASSERT(pTask->isDataScan); + streamDataSubmitRefDec((SStreamDataSubmit*)data); + taosFreeQitem(data); } } return pRes; @@ -125,14 +130,14 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { pRes = streamExecForQall(pTask, pRes); if (pRes == NULL) goto FAIL; - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); return 0; } else if (execStatus == TASK_EXEC_STATUS__CLOSING) { continue; } else if (execStatus == TASK_EXEC_STATUS__EXECUTING) { ASSERT(taosArrayGetSize(pRes) == 0); - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); return 0; } else { ASSERT(0); diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 4f91990ccf29a93cdd7b9d4fd4f2e16bfc3df480..2e8e0e1227c81a0faf9c828eb8f403367495b408 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -28,13 +28,13 @@ extern "C" { #include "trpc.h" #include "ttimer.h" -#define TIMER_MAX_MS 0x7FFFFFFF -#define ENV_TICK_TIMER_MS 1000 -#define PING_TIMER_MS 1000 -#define ELECT_TIMER_MS_MIN 500 -#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) +#define TIMER_MAX_MS 0x7FFFFFFF +#define ENV_TICK_TIMER_MS 1000 +#define PING_TIMER_MS 1000 +#define ELECT_TIMER_MS_MIN 1300 +#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) -#define HEARTBEAT_TIMER_MS 100 +#define HEARTBEAT_TIMER_MS 900 #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index aec1f77b420b595b0d0393e02f472de7f6eda101..f124867097c060fc89ffef405bde8ba7f547fb5b 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -26,11 +26,13 @@ extern "C" { #include "syncInt.h" #include "syncRaftEntry.h" #include "taosdef.h" +#include "wal.h" typedef struct SSyncLogStoreData { - SSyncNode* pSyncNode; - SWal* pWal; - SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0 + SSyncNode* pSyncNode; + SWal* pWal; + SWalReadHandle* pWalHandle; + // SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0 } SSyncLogStoreData; SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index bc5342cc7e1fc441706f3758c50ec45ada5981b6..47d0976aca483ba40480fa730759fe3d1206f568 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -420,44 +420,26 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { // prevLogIndex == -1 static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) { if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) { - if (gRaftDetailLog) { - sTrace("syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld", pMsg->prevLogIndex); - } return true; } SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); if (pMsg->prevLogIndex > myLastIndex) { - if (gRaftDetailLog) { - sTrace("syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld", pMsg->prevLogIndex, - myLastIndex); - } + sDebug("vgId:%d sync log not ok, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex); return false; } SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1); if (myPreLogTerm == SYNC_TERM_INVALID) { - sError("vgId:%d sync get pre term error, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex); + sDebug("vgId:%d sync log not ok2, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex); return false; } if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) { - if (gRaftDetailLog) { - sTrace( - "syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, " - "myPreLogTerm:%lu", - pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm); - } return true; } - if (gRaftDetailLog) { - sTrace( - "syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, " - "myPreLogTerm:%lu", - pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm); - } - + sDebug("vgId:%d sync log not ok3, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex); return false; } @@ -466,14 +448,11 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs int32_t code = 0; // print log - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, vgId:%d, term:%lu", ths->vgId, - ths->pRaftStore->currentTerm); - syncAppendEntriesLog2(logBuf, pMsg); + syncAppendEntriesLog2("==syncNodeOnAppendEntriesSnapshotCb==", pMsg); // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - sInfo("recv SyncAppendEntries maybe replica already dropped"); + syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped"); return ret; } @@ -497,7 +476,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs do { bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; if (condition) { - sTrace("recv SyncAppendEntries, candidate to follower"); + syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower"); syncNodeBecomeFollower(ths, "from candidate by append entries"); // do not reply? @@ -505,6 +484,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs } } while (0); +#if 0 // fake match // // condition1: @@ -530,7 +510,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && syncNodeHasSnapshot(ths); bool condition1 = - condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex); + condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex); // donot use syncLogEntryCount!!! use isEmpty bool condition2 = condition0 && (ths->pLogStore->syncLogLastIndex(ths->pLogStore) <= snapshot.lastApplyIndex) && (pMsg->prevLogIndex > myLastIndex); bool condition3 = condition0 && (pMsg->prevLogIndex < snapshot.lastApplyIndex); @@ -538,11 +518,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs bool condition = condition1 || condition2 || condition3 || condition4; if (condition) { - sTrace( - "recv SyncAppendEntries, fake match, myLastIndex:%ld, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, " - "condition1:%d, condition2:%d, condition3:%d, condition4:%d", - myLastIndex, ths->pLogStore->syncLogBeginIndex(ths->pLogStore), - ths->pLogStore->syncLogEndIndex(ths->pLogStore), condition1, condition2, condition3, condition4); + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, fake match, pre-index:%ld, pre-term:%lu", + pMsg->prevLogIndex, pMsg->prevLogTerm); + syncNodeEventLog(ths, logBuf); // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); @@ -562,6 +541,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs return ret; } } while (0); +#endif // fake match2 // @@ -576,8 +556,13 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && (pMsg->prevLogIndex <= ths->commitIndex); if (condition) { - sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex, - ths->commitIndex); + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries, fake match2, pre-index:%ld, pre-term:%lu, datalen:%d", pMsg->prevLogIndex, + pMsg->prevLogTerm, pMsg->dataLen); + syncNodeEventLog(ths, logBuf); + } while (0); SyncIndex matchIndex = ths->commitIndex; bool hasAppendEntries = pMsg->dataLen > 0; @@ -605,6 +590,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs code = syncNodePreCommit(ths, pAppendEntry); ASSERT(code == 0); + // update match index matchIndex = pMsg->prevLogIndex + 1; syncEntryDestory(pAppendEntry); @@ -650,11 +636,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs bool condition = condition1 || condition2; if (condition) { - sTrace( - "recv SyncAppendEntries, not match, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, condition1:%d, " - "condition2:%d, logOK:%d", - ths->pLogStore->syncLogBeginIndex(ths->pLogStore), ths->pLogStore->syncLogEndIndex(ths->pLogStore), - condition1, condition2, logOK); + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, not match, pre-index:%ld, pre-term:%lu, datalen:%d", + pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); + syncNodeEventLog(ths, logBuf); // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); @@ -693,8 +678,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs // has entries in SyncAppendEntries msg bool hasAppendEntries = pMsg->dataLen > 0; - sTrace("recv SyncAppendEntries, match, myLastIndex:%ld, hasExtraEntries:%d, hasAppendEntries:%d", myLastIndex, - hasExtraEntries, hasAppendEntries); + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, match, pre-index:%ld, pre-term:%lu, datalen:%d", + pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); + syncNodeEventLog(ths, logBuf); if (hasExtraEntries) { // make log same, rollback deleted entries diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index f13a3604dab1e776583164c0315b243a6a766665..6ca356b4f6049e602e698ca46802d49237bce32c 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -101,32 +101,27 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries int32_t ret = 0; // print log - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, vgId:%d, term:%lu", ths->vgId, - ths->pRaftStore->currentTerm); - syncAppendEntriesReplyLog2(logBuf, pMsg); + syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplySnapshotCb==", pMsg); // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - sInfo("recv SyncAppendEntriesReply, maybe replica already dropped"); - return ret; + syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped"); + return 0; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("recv SyncAppendEntriesReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term, - ths->pRaftStore->currentTerm); - return ret; + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%lu, drop stale response", pMsg->term); + syncNodeEventLog(ths, logBuf); + return 0; } - syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex); - syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex); if (gRaftDetailLog) { - SSnapshot snapshot; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", - snapshot.lastApplyIndex, snapshot.lastApplyTerm); + syncNodeEventLog(ths, "recv sync-append-entries-reply, before"); } + syncIndexMgrLog2("recv sync-append-entries-reply, before pNextIndex:", ths->pNextIndex); + syncIndexMgrLog2("recv sync-append-entries-reply, before pMatchIndex:", ths->pMatchIndex); // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { @@ -134,12 +129,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries // } if (pMsg->term > ths->pRaftStore->currentTerm) { - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, error term, receive_term:%lu current_term:%lu", - pMsg->term, ths->pRaftStore->currentTerm); - syncNodeLog2(logBuf, ths); - sError("%s", logBuf); - return ret; + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term); + syncNodeErrorLog(ths, logBuf); + return -1; } ASSERT(pMsg->term == ths->pRaftStore->currentTerm); @@ -228,8 +221,11 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } } - syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex); - syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex); + if (gRaftDetailLog) { + syncNodeEventLog(ths, "recv sync-append-entries-reply, after"); + } + syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex); + syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex); return ret; } \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 732f450f4cfd41fafe1ff72503fbf4f0ac63633b..8bbee65369e51467ad054334250ddd3bad81ebd4 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -416,7 +416,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct } } sMeta->lastConfigIndex = lastIndex; - sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lastConfigIndex:%" PRId64, pSyncNode->vgId, snapshotIndex, + sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex, sMeta->lastConfigIndex); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -433,8 +433,9 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i]; } } + sTrace("vgId:%d, sync get snapshot last config index, index:%ld lcindex:%ld", pSyncNode->vgId, snapshotLastApplyIndex, + lastIndex); - sTrace("sync syncNodeGetSnapshotConfigIndex index:%ld lastConfigIndex:%ld", snapshotLastApplyIndex, lastIndex); return lastIndex; } @@ -1310,6 +1311,10 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg)); + char* printStr = ""; + if (pCfgStr != NULL) { + printStr = pCfgStr; + } if (userStrLen < 256) { char logBuf[256 + 256]; @@ -1321,7 +1326,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, - pSyncNode->changing, pCfgStr); + pSyncNode->changing, printStr); } else { snprintf(logBuf, sizeof(logBuf), "%s", str); } @@ -1338,7 +1343,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, - pSyncNode->changing, pCfgStr); + pSyncNode->changing, printStr); } else { snprintf(s, len, "%s", str); } @@ -1957,17 +1962,21 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { taosMemoryFree(pPreEntry); return preTerm; } else { - if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; - if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { - pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); - if (snapshot.lastApplyIndex == preIndex) { - return snapshot.lastApplyTerm; - } + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); + if (snapshot.lastApplyIndex == preIndex) { + return snapshot.lastApplyTerm; } } } + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "sync node get pre term error, index:%ld", index); + syncNodeErrorLog(pSyncNode, logBuf); + } while (0); + return SYNC_TERM_INVALID; } diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 2cc1eb023991e1b31d4b803fa5845fc284147054..f40eff57d9d4304bad850369cf332e68b12a0ec7 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -107,26 +107,30 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { } char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { - int32_t len = 512; - char *s = taosMemoryMalloc(len); - memset(s, 0, len); - - snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); - char *p = s + strlen(s); - for (int i = 0; i < pSyncCfg->replicaNum; ++i) { - /* - if (p + 128 + 32 > s + len) { - break; + if (pSyncCfg != NULL) { + int32_t len = 512; + char *s = taosMemoryMalloc(len); + memset(s, 0, len); + + snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); + char *p = s + strlen(s); + for (int i = 0; i < pSyncCfg->replicaNum; ++i) { + /* + if (p + 128 + 32 > s + len) { + break; + } + */ + char buf[128 + 32]; + snprintf(buf, sizeof(buf), "%s:%d, ", pSyncCfg->nodeInfo[i].nodeFqdn, pSyncCfg->nodeInfo[i].nodePort); + strncpy(p, buf, sizeof(buf)); + p = s + strlen(s); } - */ - char buf[128 + 32]; - snprintf(buf, sizeof(buf), "%s:%d, ", pSyncCfg->nodeInfo[i].nodeFqdn, pSyncCfg->nodeInfo[i].nodePort); - strncpy(p, buf, sizeof(buf)); - p = s + strlen(s); + strcpy(p - 2, "}"); + + return s; } - strcpy(p - 2, "}"); - return s; + return NULL; } int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) { diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 701b4a7b93bd6c9db05df9152339fa8f393ca95f..cc606f4df884bfd19902c9f775e35e98dc99aa0e 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -16,10 +16,10 @@ #include "syncRaftLog.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" -#include "wal.h" // refactor, log[0 .. n] ==> log[m .. n] -static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex); +static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex); +// static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex); static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore); static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore); static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore); @@ -45,31 +45,43 @@ static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncI static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); // refactor, log[0 .. n] ==> log[m .. n] +/* static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) { - sTrace("raftLogSetBeginIndex beginIndex:%ld", beginIndex); - // if beginIndex == 0, donot need call this funciton ASSERT(beginIndex > 0); SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; + sTrace("vgId:%d, reset wal begin index:%ld", pData->pSyncNode->vgId, beginIndex); + pData->beginIndex = beginIndex; walRestoreFromSnapshot(pWal, beginIndex - 1); return 0; } +*/ + +static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) { + ASSERT(snapshotIndex >= 0); + + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + walRestoreFromSnapshot(pWal, snapshotIndex); + return 0; +} static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - return pData->beginIndex; + SyncIndex firstVer = walGetFirstVer(pWal); + return firstVer; } static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore) { return raftLogLastIndex(pLogStore); } static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) { - SyncIndex beginIndex = raftLogBeginIndex(pLogStore); - SyncIndex endIndex = raftLogEndIndex(pLogStore); - return (endIndex < beginIndex); + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + return walIsEmpty(pWal); } static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) { @@ -96,23 +108,8 @@ static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; SyncIndex lastVer = walGetLastVer(pWal); - SyncIndex firstVer = walGetFirstVer(pWal); - - if (lastVer < firstVer) { - // no record - lastIndex = -1; - } else { - if (firstVer >= 0) { - lastIndex = lastVer; - } else if (firstVer == -1) { - lastIndex = -1; - } else { - ASSERT(0); - } - } - - return lastIndex; + return lastVer; } static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) { @@ -122,6 +119,26 @@ static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) { return lastVer + 1; } +static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + if (walIsEmpty(pWal)) { + return 0; + } else { + SSyncRaftEntry* pLastEntry; + int32_t code = raftLogGetLastEntry(pLogStore, &pLastEntry); + ASSERT(code == 0); + ASSERT(pLastEntry != NULL); + + SyncTerm lastTerm = pLastEntry->term; + taosMemoryFree(pLastEntry); + return lastTerm; + } + + return 0; +} + +/* static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { SyncTerm lastTerm = 0; if (raftLogEntryCount(pLogStore) == 0) { @@ -137,6 +154,7 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { } return lastTerm; } +*/ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; @@ -160,8 +178,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); - sError("vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, - pEntry->index, err, err, errStr, sysErr, sysErrStr); + + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", + pEntry->index, err, err, errStr, sysErr, sysErrStr); + syncNodeErrorLog(pData->pSyncNode, logBuf); + ASSERT(0); } @@ -229,7 +251,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, *ppEntry = NULL; - SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); + // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); + SWalReadHandle* pWalHandle = pData->pWalHandle; if (pWalHandle == NULL) { return -1; } @@ -240,12 +263,23 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); - sError("vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, index, - err, err, errStr, sysErr, sysErrStr); - int32_t saveErr = terrno; - walCloseReadHandle(pWalHandle); - terrno = saveErr; + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, err, + err, errStr, sysErr, sysErrStr); + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { + syncNodeEventLog(pData->pSyncNode, logBuf); + } else { + syncNodeErrorLog(pData->pSyncNode, logBuf); + } + } while (0); + + /* + int32_t saveErr = terrno; + walCloseReadHandle(pWalHandle); + terrno = saveErr; + */ return code; } @@ -261,9 +295,11 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen); memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); - int32_t saveErr = terrno; - walCloseReadHandle(pWalHandle); - terrno = saveErr; + /* + int32_t saveErr = terrno; + walCloseReadHandle(pWalHandle); + terrno = saveErr; + */ return code; } @@ -285,6 +321,25 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn return code; } +static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + ASSERT(ppLastEntry != NULL); + + *ppLastEntry = NULL; + if (walIsEmpty(pWal)) { + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + return -1; + } else { + SyncIndex lastIndex = raftLogLastIndex(pLogStore); + int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); + return code; + } + + return -1; +} + +/* static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) { *ppLastEntry = NULL; if (raftLogEntryCount(pLogStore) == 0) { @@ -294,6 +349,7 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); return code; } +*/ //------------------------------- SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { @@ -306,16 +362,22 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStoreData* pData = pLogStore->data; pData->pSyncNode = pSyncNode; pData->pWal = pSyncNode->pWal; + ASSERT(pData->pWal != NULL); - SyncIndex firstVer = walGetFirstVer(pData->pWal); - SyncIndex lastVer = walGetLastVer(pData->pWal); - if (firstVer >= 0) { - pData->beginIndex = firstVer; - } else if (firstVer == -1) { - pData->beginIndex = lastVer + 1; - } else { - ASSERT(0); - } + pData->pWalHandle = walOpenReadHandle(pData->pWal); + ASSERT(pData->pWalHandle != NULL); + + /* + SyncIndex firstVer = walGetFirstVer(pData->pWal); + SyncIndex lastVer = walGetLastVer(pData->pWal); + if (firstVer >= 0) { + pData->beginIndex = firstVer; + } else if (firstVer == -1) { + pData->beginIndex = lastVer + 1; + } else { + ASSERT(0); + } + */ pLogStore->appendEntry = logStoreAppendEntry; pLogStore->getEntry = logStoreGetEntry; @@ -325,7 +387,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; pLogStore->getCommitIndex = logStoreGetCommitIndex; - pLogStore->syncLogSetBeginIndex = raftLogSetBeginIndex; + // pLogStore->syncLogSetBeginIndex = raftLogSetBeginIndex; + pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot; pLogStore->syncLogBeginIndex = raftLogBeginIndex; pLogStore->syncLogEndIndex = raftLogEndIndex; pLogStore->syncLogIsEmpty = raftLogIsEmpty; @@ -344,6 +407,11 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { void logStoreDestory(SSyncLogStore* pLogStore) { if (pLogStore != NULL) { + SSyncLogStoreData* pData = pLogStore->data; + if (pData->pWalHandle != NULL) { + walCloseReadHandle(pData->pWalHandle); + } + taosMemoryFree(pLogStore->data); taosMemoryFree(pLogStore); } @@ -368,8 +436,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); - sError("vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, - pEntry->index, err, err, errStr, sysErr, sysErrStr); + + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", + pEntry->index, err, err, errStr, sysErr, sysErrStr); + syncNodeErrorLog(pData->pSyncNode, logBuf); ASSERT(0); } @@ -389,7 +460,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SWal* pWal = pData->pWal; if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { - SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); + // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); + SWalReadHandle* pWalHandle = pData->pWalHandle; ASSERT(pWalHandle != NULL); int32_t code = walReadWithHandle(pWalHandle, index); @@ -398,12 +470,20 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); - sError("vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, - index, err, err, errStr, sysErr, sysErrStr); + + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, + err, err, errStr, sysErr, sysErrStr); + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { + syncNodeEventLog(pData->pSyncNode, logBuf); + } else { + syncNodeErrorLog(pData->pSyncNode, logBuf); + } + } while (0); ASSERT(0); } - // ASSERT(walReadWithHandle(pWalHandle, index) == 0); SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); ASSERT(pEntry != NULL); @@ -417,9 +497,11 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ASSERT(pEntry->dataLen == pWalHandle->pHead->head.bodyLen); memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); - int32_t saveErr = terrno; - walCloseReadHandle(pWalHandle); - terrno = saveErr; + /* + int32_t saveErr = terrno; + walCloseReadHandle(pWalHandle); + terrno = saveErr; + */ return pEntry; @@ -498,6 +580,7 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) { return pEntry; } +/* cJSON* logStore2Json(SSyncLogStore* pLogStore) { char u64buf[128] = {0}; SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data; @@ -544,6 +627,57 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot); return pJson; } +*/ + +cJSON* logStore2Json(SSyncLogStore* pLogStore) { + char u64buf[128] = {0}; + SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data; + cJSON* pRoot = cJSON_CreateObject(); + + if (pData != NULL && pData->pWal != NULL) { + snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); + cJSON_AddStringToObject(pRoot, "pWal", u64buf); + + SyncIndex beginIndex = raftLogBeginIndex(pLogStore); + snprintf(u64buf, sizeof(u64buf), "%ld", beginIndex); + cJSON_AddStringToObject(pRoot, "beginIndex", u64buf); + + SyncIndex endIndex = raftLogEndIndex(pLogStore); + snprintf(u64buf, sizeof(u64buf), "%ld", endIndex); + cJSON_AddStringToObject(pRoot, "endIndex", u64buf); + + int32_t count = raftLogEntryCount(pLogStore); + cJSON_AddNumberToObject(pRoot, "entryCount", count); + + snprintf(u64buf, sizeof(u64buf), "%ld", raftLogWriteIndex(pLogStore)); + cJSON_AddStringToObject(pRoot, "WriteIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%d", raftLogIsEmpty(pLogStore)); + cJSON_AddStringToObject(pRoot, "IsEmpty", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); + + cJSON* pEntries = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "pEntries", pEntries); + + if (!raftLogIsEmpty(pLogStore)) { + for (SyncIndex i = beginIndex; i <= endIndex; ++i) { + SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); + cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); + syncEntryDestory(pEntry); + } + } + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot); + return pJson; +} char* logStore2Str(SSyncLogStore* pLogStore) { cJSON* pJson = logStore2Json(pLogStore); @@ -563,7 +697,8 @@ cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) { snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); cJSON_AddStringToObject(pRoot, "pWal", u64buf); - snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex); + SyncIndex beginIndex = raftLogBeginIndex(pLogStore); + snprintf(u64buf, sizeof(u64buf), "%ld", beginIndex); cJSON_AddStringToObject(pRoot, "beginIndex", u64buf); SyncIndex endIndex = raftLogEndIndex(pLogStore); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index c378926e286761ef17508840d198e13eca601792..02cd977e1e65a29a0249584a18de2dd126f85787 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -219,6 +219,17 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { int32_t ret = 0; + do { + char host[128]; + uint16_t port; + syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); + sDebug( + "vgId:%d, send sync-append-entries to %s:%d, term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, commit:%ld, " + "datalen:%d", + pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, + pMsg->commitIndex, pMsg->dataLen); + } while (0); + SRpcMsg rpcMsg; syncAppendEntries2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index ed9d62fd149d94c89f12c6f99518359ba2ac77ed..4fc01284007ec389888b4dc160f68fc1dc954523 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -545,7 +545,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; } - pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1); + // pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1); + pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, pMsg->lastIndex); // maybe update lastconfig if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { diff --git a/source/libs/sync/test/syncRaftLogTest2.cpp b/source/libs/sync/test/syncRaftLogTest2.cpp index bd6fa7c2c3c5616b206540e63e0b52821608691f..e3ceae061535bdcdb579ba83465803a3eac7b096 100644 --- a/source/libs/sync/test/syncRaftLogTest2.cpp +++ b/source/libs/sync/test/syncRaftLogTest2.cpp @@ -113,7 +113,8 @@ void test2() { pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); pSyncNode->pLogStore = pLogStore; - pLogStore->syncLogSetBeginIndex(pLogStore, 5); + //pLogStore->syncLogSetBeginIndex(pLogStore, 5); + pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4); logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore); if (gAssert) { @@ -228,7 +229,8 @@ void test4() { assert(pLogStore); pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore); - pLogStore->syncLogSetBeginIndex(pLogStore, 5); + //pLogStore->syncLogSetBeginIndex(pLogStore, 5); + pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4); for (int i = 5; i <= 9; ++i) { int32_t dataLen = 10; @@ -289,7 +291,8 @@ void test5() { assert(pLogStore); pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore); - pLogStore->syncLogSetBeginIndex(pLogStore, 5); + //pLogStore->syncLogSetBeginIndex(pLogStore, 5); + pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4); for (int i = 5; i <= 9; ++i) { int32_t dataLen = 10; @@ -363,7 +366,8 @@ void test6() { assert(pLogStore); pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest6 ----- ", pLogStore); - pLogStore->syncLogSetBeginIndex(pLogStore, 5); + // pLogStore->syncLogSetBeginIndex(pLogStore, 5); + pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4); for (int i = 5; i <= 9; ++i) { int32_t dataLen = 10; @@ -405,14 +409,32 @@ void test6() { assert(pLogStore->syncLogLastTerm(pLogStore) == 0); } + do { + SyncIndex firstVer = walGetFirstVer(pWal); + SyncIndex lastVer = walGetLastVer(pWal); + bool isEmpty = walIsEmpty(pWal); + printf("before -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty); + } while (0); + logStoreDestory(pLogStore); cleanup(); + + // restart init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); pSyncNode->pLogStore = pLogStore; + + + do { + SyncIndex firstVer = walGetFirstVer(pWal); + SyncIndex lastVer = walGetLastVer(pWal); + bool isEmpty = walIsEmpty(pWal); + printf("after -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty); + } while (0); + logStoreLog2((char*)"\n\n\ntest6 restart ----- ", pLogStore); if (gAssert) { @@ -432,17 +454,20 @@ void test6() { int main(int argc, char** argv) { tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE; + gRaftDetailLog = true; if (argc == 2) { gAssert = atoi(argv[1]); } sTrace("gAssert : %d", gAssert); +/* test1(); test2(); test3(); test4(); test5(); +*/ test6(); return 0; diff --git a/source/libs/sync/test/syncRaftLogTest3.cpp b/source/libs/sync/test/syncRaftLogTest3.cpp index c81a6d3ca9772f0ff3642062ab1854f1dd36342c..302e29a091277e085dcd67d0e2a55df00108a0b0 100644 --- a/source/libs/sync/test/syncRaftLogTest3.cpp +++ b/source/libs/sync/test/syncRaftLogTest3.cpp @@ -312,7 +312,8 @@ void test5() { pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore); - pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6); + //pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6); + pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, 5); for (int i = 6; i <= 10; ++i) { int32_t dataLen = 10; SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); @@ -372,6 +373,7 @@ void test5() { int main(int argc, char** argv) { tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE; + gRaftDetailLog = true; if (argc == 2) { gAssert = atoi(argv[1]); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 9bed6d230bcd227317075c1894462b1d805d270b..682afbb785784f0e158f95750fc376130a6fd696 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -272,14 +272,23 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { } } + if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) { + wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pRead->pWal->vers.firstVer, + pRead->pWal->vers.lastVer); + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + return -1; + } + ASSERT(taosValidFile(pRead->pReadLogTFile) == true); code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead)); if (code != sizeof(SWalHead)) { if (code < 0) terrno = TAOS_SYSTEM_ERROR(errno); - else + else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + ASSERT(0); + } return -1; } @@ -304,8 +313,10 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { pRead->pHead->head.bodyLen) { if (code < 0) terrno = TAOS_SYSTEM_ERROR(errno); - else + else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + ASSERT(0); + } return -1; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 031289f93ea8ab58dadd55be7490b4ec4a8caeda..34f83fb8a9aaf93e5b38603ca31377b1c152d5f0 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -150,6 +150,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ASSERT(code == 0); if (code != 0) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + ASSERT(0); return -1; } if (head.head.version != ver) { diff --git a/tests/pytest/util/cluster.py b/tests/pytest/util/cluster.py index 5debfab2c4266954577fcabf5e78c8b000f5ed04..6eb78f07716b22dbff09b10a9d779ec8b5672b36 100644 --- a/tests/pytest/util/cluster.py +++ b/tests/pytest/util/cluster.py @@ -21,45 +21,39 @@ class ClusterDnodes(TDDnodes): self.testCluster = False self.valgrind = 0 self.killValgrind = 1 - self.independent = True - self.dnodeNums = 5 - - # def getTDDnodes(dnodeNums): - - # return class ConfigureyCluster: - """configure dnodes and return TDDnodes list, it can """ + """This will create defined number of dnodes and create a cluset. + at the same time, it will return TDDnodes list: dnodes, """ + hostname= socket.gethostname() def __init__(self): - self.dnodes = None - self.dnodes_nums = 5 + self.dnodes = [] + self.dnodeNums = 5 self.independent = True - self.start_port = 6030 + self.startPort = 6030 self.portStep = 100 - hostname1= socket.gethostname() + self.mnodeNums = 0 - def configure_cluster(self ,dnodes_nums=5,independent=True,start_port=6030,portStep=100,hostname="%s"%hostname1): - self.start_port=int(start_port) + def configure_cluster(self ,dnodeNums=5,mnodeNums=0,startPort=6030,portStep=100,hostname="%s"%hostname): + self.startPort=int(startPort) self.portStep=int(portStep) self.hostname=hostname - self.dnodes_nums = int(dnodes_nums) - self.independent = independent + self.dnodeNums = int(dnodeNums) + self.mnodeNums = int(mnodeNums) self.dnodes = [] - start_port_sec = 6130 - for num in range(1, (self.dnodes_nums+1)): + startPort_sec = int(startPort+portStep) + for num in range(1, (self.dnodeNums+1)): dnode = TDDnode(num) - dnode.addExtraCfg("firstEp", f"{hostname}:{self.start_port}") + dnode.addExtraCfg("firstEp", f"{hostname}:{self.startPort}") dnode.addExtraCfg("fqdn", f"{hostname}") - dnode.addExtraCfg("serverPort", f"{self.start_port + (num-1)*self.portStep}") - # dnode.addExtraCfg("monitorFqdn", hostname) - # dnode.addExtraCfg("monitorPort", 7043) - dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}") - # configure three dnoe don't support vnodes - if self.dnodes_nums > 4 : - if self.independent and (num < 4): - dnode.addExtraCfg("supportVnodes", 0) + dnode.addExtraCfg("serverPort", f"{self.startPort + (num-1)*self.portStep}") + dnode.addExtraCfg("secondEp", f"{hostname}:{startPort_sec}") + + # configure dnoe of independent mnodes + if num <= self.mnodeNums and self.mnodeNums != 0 : + dnode.addExtraCfg("supportVnodes", 0) # print(dnode) self.dnodes.append(dnode) return self.dnodes @@ -69,24 +63,7 @@ class ConfigureyCluster: for dnode in self.dnodes[1:]: # print(dnode.cfgDict) dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] - print(dnode_id) tdSql.execute(" create dnode '%s';"%dnode_id) - # count=0 - # while count < 10: - # time.sleep(1) - # tdSql.query("show dnodes;") - # if tdSql.checkRows(self.dnodes_nums) : - # print("mnode is three nodes") - # if tdSql.queryResult[0][4]=='leader' : - # if tdSql.queryResult[2][4]=='offline': - # if tdSql.queryResult[1][2]=='follower': - # print("stop mnodes on dnode 3 successfully in 10s") - # break - # count+=1 - # else: - # print("stop mnodes on dnode 3 failed in 10s") - # return -1 - checkstatus=False @@ -97,206 +74,19 @@ class ConfigureyCluster: tdSql.query("show dnodes") # tdLog.debug(tdSql.queryResult) status=0 - for i in range(self.dnodes_nums): + for i in range(self.dnodeNums): if tdSql.queryResult[i][4] == "ready": status+=1 - tdLog.debug(status) + # tdLog.debug(status) - if status == self.dnodes_nums: - tdLog.debug(" create cluster with %d dnode and check cluster dnode all ready within 5s! " %self.dnodes_nums) + if status == self.dnodeNums: + tdLog.debug(" create cluster with %d dnode and check cluster dnode all ready within 5s! " %self.dnodeNums) break count+=1 time.sleep(1) else: - tdLog.debug("create cluster with %d dnode but check dnode not ready within 5s ! "%self.dnodes_nums) + tdLog.debug("create cluster with %d dnode but check dnode not ready within 5s ! "%self.dnodeNums) return -1 -cluster = ConfigureyCluster() - - # def start(self ,dnodes_nums): - - # self.TDDnodes = ClusterDnodes(dnodes) - # self.TDDnodes.init("") - # self.TDDnodes.setTestCluster(testCluster) - # self.TDDnodes.setValgrind(valgrind) - # self.TDDnodes.stopAll() - # for dnode in self.TDDnodes.dnodes: - # self.TDDnodes.deploy(dnode.index,{}) - - # for dnode in self.TDDnodes.dnodes: - # self.TDDnodes.starttaosd(dnode.index) - - # # create cluster - # for dnode in self.TDDnodes.dnodes[1:]: - # # print(dnode.cfgDict) - # dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] - # dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0] - # dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] - # cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;" - # print(cmd) - # os.system(cmd) - - # time.sleep(2) - # tdLog.info(" create cluster with %d dnode done! " %dnodes_nums) - - # def buildcluster(self,dnodenumber): - # self.depoly_cluster(dnodenumber) - # self.master_dnode = self.TDDnodes.dnodes[0] - # self.host=self.master_dnode.cfgDict["fqdn"] - # conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir) - # tdSql.init(conn1.cursor()) - - # def checkdnodes(self,dnodenumber): - # count=0 - # while count < 10: - # time.sleep(1) - # statusReadyBumber=0 - # tdSql.query("show dnodes;") - # if tdSql.checkRows(dnodenumber) : - # print("dnode is %d nodes"%dnodenumber) - # for i in range(dnodenumber): - # if tdSql.queryResult[i][4] !='ready' : - # status=tdSql.queryResult[i][4] - # print("dnode:%d status is %s "%(i,status)) - # break - # else: - # statusReadyBumber+=1 - # print(statusReadyBumber) - # if statusReadyBumber == dnodenumber : - # print("all of %d mnodes is ready in 10s "%dnodenumber) - # return True - # break - # count+=1 - # else: - # print("%d mnodes is not ready in 10s "%dnodenumber) - # return False - - - # def check3mnode(self): - # count=0 - # while count < 10: - # time.sleep(1) - # tdSql.query("show mnodes;") - # if tdSql.checkRows(3) : - # print("mnode is three nodes") - # if tdSql.queryResult[0][2]=='leader' : - # if tdSql.queryResult[1][2]=='follower': - # if tdSql.queryResult[2][2]=='follower': - # print("three mnodes is ready in 10s") - # break - # elif tdSql.queryResult[0][2]=='follower' : - # if tdSql.queryResult[1][2]=='leader': - # if tdSql.queryResult[2][2]=='follower': - # print("three mnodes is ready in 10s") - # break - # elif tdSql.queryResult[0][2]=='follower' : - # if tdSql.queryResult[1][2]=='follower': - # if tdSql.queryResult[2][2]=='leader': - # print("three mnodes is ready in 10s") - # break - # count+=1 - # else: - # print("three mnodes is not ready in 10s ") - # return -1 - - # tdSql.query("show mnodes;") - # tdSql.checkRows(3) - # tdSql.checkData(0,1,'%s:6030'%self.host) - # tdSql.checkData(0,3,'ready') - # tdSql.checkData(1,1,'%s:6130'%self.host) - # tdSql.checkData(1,3,'ready') - # tdSql.checkData(2,1,'%s:6230'%self.host) - # tdSql.checkData(2,3,'ready') - - # def check3mnode1off(self): - # count=0 - # while count < 10: - # time.sleep(1) - # tdSql.query("show mnodes;") - # if tdSql.checkRows(3) : - # print("mnode is three nodes") - # if tdSql.queryResult[0][2]=='offline' : - # if tdSql.queryResult[1][2]=='leader': - # if tdSql.queryResult[2][2]=='follower': - # print("stop mnodes on dnode 2 successfully in 10s") - # break - # elif tdSql.queryResult[1][2]=='follower': - # if tdSql.queryResult[2][2]=='leader': - # print("stop mnodes on dnode 2 successfully in 10s") - # break - # count+=1 - # else: - # print("stop mnodes on dnode 2 failed in 10s ") - # return -1 - # tdSql.error("drop mnode on dnode 1;") - - # tdSql.query("show mnodes;") - # tdSql.checkRows(3) - # tdSql.checkData(0,1,'%s:6030'%self.host) - # tdSql.checkData(0,2,'offline') - # tdSql.checkData(0,3,'ready') - # tdSql.checkData(1,1,'%s:6130'%self.host) - # tdSql.checkData(1,3,'ready') - # tdSql.checkData(2,1,'%s:6230'%self.host) - # tdSql.checkData(2,3,'ready') - - # def check3mnode2off(self): - # count=0 - # while count < 40: - # time.sleep(1) - # tdSql.query("show mnodes;") - # if tdSql.checkRows(3) : - # print("mnode is three nodes") - # if tdSql.queryResult[0][2]=='leader' : - # if tdSql.queryResult[1][2]=='offline': - # if tdSql.queryResult[2][2]=='follower': - # print("stop mnodes on dnode 2 successfully in 10s") - # break - # count+=1 - # else: - # print("stop mnodes on dnode 2 failed in 10s ") - # return -1 - # tdSql.error("drop mnode on dnode 2;") - - # tdSql.query("show mnodes;") - # tdSql.checkRows(3) - # tdSql.checkData(0,1,'%s:6030'%self.host) - # tdSql.checkData(0,2,'leader') - # tdSql.checkData(0,3,'ready') - # tdSql.checkData(1,1,'%s:6130'%self.host) - # tdSql.checkData(1,2,'offline') - # tdSql.checkData(1,3,'ready') - # tdSql.checkData(2,1,'%s:6230'%self.host) - # tdSql.checkData(2,2,'follower') - # tdSql.checkData(2,3,'ready') - - # def check3mnode3off(self): - # count=0 - # while count < 10: - # time.sleep(1) - # tdSql.query("show mnodes;") - # if tdSql.checkRows(3) : - # print("mnode is three nodes") - # if tdSql.queryResult[0][2]=='leader' : - # if tdSql.queryResult[2][2]=='offline': - # if tdSql.queryResult[1][2]=='follower': - # print("stop mnodes on dnode 3 successfully in 10s") - # break - # count+=1 - # else: - # print("stop mnodes on dnode 3 failed in 10s") - # return -1 - # tdSql.error("drop mnode on dnode 3;") - # tdSql.query("show mnodes;") - # tdSql.checkRows(3) - # tdSql.checkData(0,1,'%s:6030'%self.host) - # tdSql.checkData(0,2,'leader') - # tdSql.checkData(0,3,'ready') - # tdSql.checkData(1,1,'%s:6130'%self.host) - # tdSql.checkData(1,2,'follower') - # tdSql.checkData(1,3,'ready') - # tdSql.checkData(2,1,'%s:6230'%self.host) - # tdSql.checkData(2,2,'offline') - # tdSql.checkData(2,3,'ready') - +cluster = ConfigureyCluster() \ No newline at end of file diff --git a/tests/script/tmp/monitor.sim b/tests/script/tmp/monitor.sim index ba98bec2d05ef9c976ac93c8092b2d5051caf62e..036d4cc6db0959db38fd4281ab3f8020a08f2ab2 100644 --- a/tests/script/tmp/monitor.sim +++ b/tests/script/tmp/monitor.sim @@ -12,6 +12,8 @@ sql connect print =============== show dnodes sleep 2000 sql create database db vgroups 2; +sql use db; +sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd"; sleep 2000 print =============== create drop qnode 1 diff --git a/tests/system-test/6-cluster/5dnode3mnodeStop.py b/tests/system-test/6-cluster/5dnode3mnodeStop.py index e1f66ceacb9a9e2f5f4f17547243262a227c4eb7..69b9c3d879bd4e426bdff4c53215ad206b5b39a3 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeStop.py +++ b/tests/system-test/6-cluster/5dnode3mnodeStop.py @@ -159,13 +159,10 @@ class TDTestCase: tdSql.query("show mnodes;") tdSql.checkRows(3) tdSql.checkData(0,1,'%s:6030'%self.host) - tdSql.checkData(0,2,'leader') tdSql.checkData(0,3,'ready') tdSql.checkData(1,1,'%s:6130'%self.host) - tdSql.checkData(1,2,'offline') tdSql.checkData(1,3,'ready') tdSql.checkData(2,1,'%s:6230'%self.host) - tdSql.checkData(2,2,'follower') tdSql.checkData(2,3,'ready') def check3mnode3off(self): @@ -265,13 +262,6 @@ class TDTestCase: while stopcount <= 2: for i in range(dnodenumber): tdDnodes[i].stoptaosd() - # if i == 1 : - # self.check3mnode2off() - # elif i == 2 : - # self.check3mnode3off() - # elif i == 0: - # self.check3mnode1off() - tdDnodes[i].starttaosd() # self.check3mnode() stopcount+=1 diff --git a/tests/system-test/7-tmq/tmq3mnodeSwitch.py b/tests/system-test/7-tmq/tmq3mnodeSwitch.py new file mode 100644 index 0000000000000000000000000000000000000000..2769c3867bbf152269b55fe57d183d97cfd7d58a --- /dev/null +++ b/tests/system-test/7-tmq/tmq3mnodeSwitch.py @@ -0,0 +1,272 @@ + +from ntpath import join +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * + +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.dnodes = 5 + self.mnodes = 3 + self.idIndex = 0 + self.roleIndex = 2 + self.mnodeStatusIndex = 3 + self.mnodeEpIndex = 1 + self.dnodeStatusIndex = 4 + self.mnodeCheckCnt = 10 + self.host = socket.gethostname() + self.startPort = 6030 + self.portStep = 100 + self.dnodeOfLeader = 0 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def checkDnodesStatusAndCreateMnode(self,dnodeNumbers): + count=0 + while count < dnodeNumbers: + tdSql.query("show dnodes") + # tdLog.debug(tdSql.queryResult) + dCnt = 0 + for i in range(dnodeNumbers): + if tdSql.queryResult[i][self.dnodeStatusIndex] != "ready": + break + else: + dCnt += 1 + if dCnt == dnodeNumbers: + break + time.sleep(1) + tdLog.debug("............... waiting for all dnodes ready!") + + tdLog.info("==============create two new mnodes ========") + tdSql.execute("create mnode on dnode 2") + tdSql.execute("create mnode on dnode 3") + self.check3mnode() + return + + def check3mnode(self): + count=0 + while count < self.mnodeCheckCnt: + time.sleep(1) + tdSql.query("show mnodes;") + if tdSql.checkRows(self.mnodes) : + tdLog.debug("mnode is three nodes") + else: + tdLog.exit("mnode number is correct") + + roleOfMnode0 = tdSql.queryResult[0][self.roleIndex] + roleOfMnode1 = tdSql.queryResult[1][self.roleIndex] + roleOfMnode2 = tdSql.queryResult[2][self.roleIndex] + + if roleOfMnode0=='leader' and roleOfMnode1=='follower' and roleOfMnode2 == 'follower' : + self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex] + break + elif roleOfMnode0=='follower' and roleOfMnode1=='leader' and roleOfMnode2 == 'follower' : + self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex] + break + elif roleOfMnode0=='follower' and roleOfMnode1=='follower' and roleOfMnode2 == 'leader' : + self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex] + break + else: + count+=1 + else: + tdLog.exit("three mnodes is not ready in 10s ") + + tdSql.query("show mnodes;") + tdSql.checkRows(self.mnodes) + tdSql.checkData(0,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort)) + tdSql.checkData(0,self.mnodeStatusIndex,'ready') + tdSql.checkData(1,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort+self.portStep)) + tdSql.checkData(1,self.mnodeStatusIndex,'ready') + tdSql.checkData(2,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort+self.portStep*2)) + tdSql.checkData(2,self.mnodeStatusIndex,'ready') + + def check3mnode1off(self): + count=0 + while count < self.mnodeCheckCnt: + time.sleep(1) + tdSql.query("show mnodes") + tdLog.debug(tdSql.queryResult) + # if tdSql.checkRows(self.mnodes) : + # tdLog.debug("mnode is three nodes") + # else: + # tdLog.exit("mnode number is correct") + + roleOfMnode0 = tdSql.queryResult[0][self.roleIndex] + roleOfMnode1 = tdSql.queryResult[1][self.roleIndex] + roleOfMnode2 = tdSql.queryResult[2][self.roleIndex] + + if roleOfMnode0=='offline' : + if roleOfMnode1=='leader' and roleOfMnode2 == 'follower' : + self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex] + break + elif roleOfMnode1=='follower' and roleOfMnode2 == 'leader' : + self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex] + break + elif roleOfMnode1=='offline' : + if roleOfMnode0=='leader' and roleOfMnode2 == 'follower' : + self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex] + break + elif roleOfMnode0=='follower' and roleOfMnode2 == 'leader' : + self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex] + break + elif roleOfMnode2=='offline' : + if roleOfMnode0=='leader' and roleOfMnode1 == 'follower' : + self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex] + break + elif roleOfMnode0=='follower' and roleOfMnode1 == 'leader' : + self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex] + break + + count+=1 + else: + tdLog.exit("three mnodes is not ready in 10s ") + + def checkFileContent(self, consumerId, queryString): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId) + cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) + tdLog.info(cmdStr) + os.system(cmdStr) + + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) + tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) + + consumeFile = open(consumeRowsFile, mode='r') + queryFile = open(dstFile, mode='r') + + # skip first line for it is schema + queryFile.readline() + + while True: + dst = queryFile.readline() + src = consumeFile.readline() + + if dst: + if dst != src: + tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) + else: + break + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db1', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}, {'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'rowsPerTbl': 100000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1} + + topicNameList = ['topic1'] + expectRowsList = [] + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) + tdLog.info("create stb") + tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) + tdLog.info("create ctb") + tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) + tdLog.info("async insert data") + pThread = tmqCom.asyncInsertData(paraDict) + + tdLog.info("create topics from stb with filter") + queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) + + tdLog.info("wait the notify info of start consume") + tmqCom.getStartConsumeNotifyFromTmqsim() + + tdLog.info("start switch mnode ................") + tdDnodes = cluster.dnodes + + tdLog.info("1. stop dnode 0") + tdDnodes[0].stoptaosd() + time.sleep(10) + self.check3mnode1off() + + tdLog.info("2. start dnode 0") + tdDnodes[0].starttaosd() + self.check3mnode() + + tdLog.info("3. stop dnode 1") + tdDnodes[1].stoptaosd() + time.sleep(10) + self.check3mnode1off() + + tdLog.info("switch end and wait insert data end ................") + pThread.join() + + tdLog.info("check the consume result") + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + if expectRowsList[0] != resultList[0]: + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) + tdLog.exit("0 tmq consume rows error!") + + self.checkFileContent(consumerId, queryString) + + time.sleep(10) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + tdLog.printNoPrefix("======== Notes: must add '-N 5' for run the script ========") + self.checkDnodesStatusAndCreateMnode(self.dnodes) + self.tmqCase1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 9b23056546de3d7e74cd2bae9c3503d95df53a44..d17e36fc975e36d582139c199f49d69639a23a3a 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -204,6 +204,35 @@ class TMQCom: tdLog.debug("insert data ............ [OK]") return + def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + t = time.time() + startTs = int(round(t * 1000)) + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s%d values "%(ctbPrefix,i) + for j in range(rowsPerTbl): + if (j % 2 == 0): + sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j) + else: + sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, -j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s%d values " %(ctbPrefix,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0): tdLog.debug("start to insert data ............") tsql.execute("use %s" %dbName) @@ -291,6 +320,17 @@ class TMQCom: pThread.start() return pThread + def threadFunctionForInsert(self, **paraDict): + # create new connector for new tdSql instance in my thread + newTdSql = tdCom.newTdSql() + self.insert_data_2(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) + return + + def asyncInsertData(self, paraDict): + pThread = threading.Thread(target=self.threadFunctionForInsert, kwargs=paraDict) + pThread.start() + return pThread + def close(self): self.cursor.close() diff --git a/tests/system-test/7-tmq/tmqUdf.py b/tests/system-test/7-tmq/tmqUdf.py index f1c451de859d1321e917ed0441ed144d297854c0..d241a97e0cf93ed32e2d27b1e28657846f88737a 100644 --- a/tests/system-test/7-tmq/tmqUdf.py +++ b/tests/system-test/7-tmq/tmqUdf.py @@ -113,7 +113,7 @@ class TDTestCase: tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) tdLog.info("create topics from stb with filter") - queryString = "select ts, c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) + queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 19a67e924c055afa6c470f69f957804d15b80e66..73e6716cad36fff4268eb28d5842339eabccf91e 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -112,7 +112,7 @@ python3 ./test.py -f 2-query/twa.py python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode2mnode.py -python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 +python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 # python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py @@ -134,3 +134,4 @@ python3 ./test.py -f 7-tmq/schema.py python3 ./test.py -f 7-tmq/stbFilter.py python3 ./test.py -f 7-tmq/tmqCheckData.py python3 ./test.py -f 7-tmq/tmqUdf.py +#python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5 diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 4ca16e7a5766d61bd569d9f9ae31ee98c14f48ac..7d78f5ab21580508a6431fcf2fedb981df6c2e63 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -60,10 +60,11 @@ if __name__ == "__main__": stop = 0 restart = False dnodeNums = 1 + mnodeNums = 0 updateCfgDict = {} execCmd = "" - opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:', [ - 'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums']) + opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:', [ + 'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums']) for key, value in opts: if key in ['-h', '--help']: tdLog.printNoPrefix( @@ -79,7 +80,8 @@ if __name__ == "__main__": tdLog.printNoPrefix('-d update cfg dict, base64 json str') tdLog.printNoPrefix('-k not kill valgrind processer') tdLog.printNoPrefix('-e eval str to run') - tdLog.printNoPrefix('-N create dnodes numbers clusters') + tdLog.printNoPrefix('-N create dnodes numbers in clusters') + tdLog.printNoPrefix('-M create mnode numbers in clusters') sys.exit(0) @@ -133,6 +135,9 @@ if __name__ == "__main__": if key in ['-N', '--dnodeNums']: dnodeNums = value + if key in ['-M', '--mnodeNums']: + mnodeNums = value + if not execCmd == "": tdDnodes.init(deployPath) print(execCmd) @@ -244,9 +249,8 @@ if __name__ == "__main__": tdDnodes.start(1) tdCases.logSql(logSql) else : - print("start cluster and dnodes number") - - dnodeslist = cluster.configure_cluster(dnodes_nums=dnodeNums,independent=True) + tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums)) + dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums,mnodeNums=mnodeNums) tdDnodes = ClusterDnodes(dnodeslist) tdDnodes.init(deployPath, masterIp) tdDnodes.setTestCluster(testCluster) diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 053cd79432319dc35dbd50bb088d0d918b1e2099..e96f5d4793a29c34615128d72953c5e7e9a22f1a 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -453,7 +453,17 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) int32_t precision = taos_result_precision(msg); const char* tbName = tmq_get_table_name(msg); - dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision); + #if 0 + // get schema + //============================== stub =================================================// + for (int32_t i = 0; i < numOfFields; i++) { + taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes); + } + //============================== stub =================================================// + #endif + + dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision); + taos_print_row(buf, row, fields, numOfFields); if (0 != g_stConfInfo.showRowFlag) { @@ -656,12 +666,13 @@ void* consumeThreadFunc(void* param) { pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); if (pInfo->taos == NULL) { taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n"); - exit(-1); + return NULL; } build_consumer(pInfo); build_topic_list(pInfo); if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { + taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n"); assert(0); return NULL; } @@ -669,7 +680,9 @@ void* consumeThreadFunc(void* param) { int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); if (err != 0) { pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); + taosFprintfFile(g_fp, "tmq_subscribe()! reason: %s\n", tmq_err2str(err)); + assert(0); + return NULL; } tmq_list_destroy(pInfo->topicList); @@ -688,14 +701,13 @@ void* consumeThreadFunc(void* param) { err = tmq_unsubscribe(pInfo->tmq); if (err != 0) { pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); - /*pInfo->consumeMsgCnt = -1;*/ - /*return NULL;*/ + taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err)); } err = tmq_consumer_close(pInfo->tmq); if (err != 0) { pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); - /*exit(-1);*/ + taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err)); } pInfo->tmq = NULL;