提交 0d9ed6d6 编写于 作者: wmmhello's avatar wmmhello

fix:merge from 3.0

......@@ -71,8 +71,8 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
#define colDataGetData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
#define IS_JSON_NULL(type, data) ((type) == TSDB_DATA_TYPE_JSON && \
(*(data) == TSDB_DATA_TYPE_NULL || tTagIsJsonNull(data)))
#define IS_JSON_NULL(type, data) \
((type) == TSDB_DATA_TYPE_JSON && (*(data) == TSDB_DATA_TYPE_NULL || tTagIsJsonNull(data)))
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
if (!pColumnInfoData->hasNull) {
......@@ -186,7 +186,8 @@ int32_t getJsonValueLen(const char* data);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, uint32_t* capacity,
const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
const SDataBlockInfo* pBlockInfo);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
......@@ -222,6 +223,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createDataBlock();
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
......
......@@ -73,10 +73,12 @@ void mndStop(SMnode *pMnode);
* @param pMnode The mnode object.
* @param pCluster
* @param pVgroup
* @param pStbInfo
* @param pGrant
* @return int32_t 0 for success, -1 for failure.
*/
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pCluster, SMonVgroupInfo *pVgroup, SMonGrantInfo *pGrant);
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo);
/**
* @brief Get mnode loads for status msg.
......
......@@ -36,7 +36,7 @@ typedef struct SReadHandle {
void* vnode;
void* mnd;
SMsgCb* pMsgCb;
int8_t initTsdbReader;
// int8_t initTsdbReader;
} SReadHandle;
enum {
......
......@@ -138,6 +138,15 @@ typedef struct {
SArray *vgroups; // array of SMonVgroupDesc
} SMonVgroupInfo;
typedef struct {
char stb_name[TSDB_TABLE_NAME_LEN];
char database_name[TSDB_DB_NAME_LEN];
} SMonStbDesc;
typedef struct {
SArray *stbs; // array of SMonStbDesc
} SMonStbInfo;
typedef struct {
int32_t expire_time;
int64_t timeseries_used;
......@@ -147,6 +156,7 @@ typedef struct {
typedef struct {
SMonClusterInfo cluster;
SMonVgroupInfo vgroup;
SMonStbInfo stb;
SMonGrantInfo grant;
SMonSysInfo sys;
SMonLogs log;
......
......@@ -344,10 +344,12 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc
if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(pBlock);
} else if (pTask->sinkType == TASK_SINK__SMA) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(pBlock);
} else {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
......
......@@ -157,13 +157,13 @@ typedef struct SSyncLogStore {
SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);
// refactor, log[0 .. n] ==> log[m .. n]
int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
int32_t (*syncLogResetBeginIndex)(struct SSyncLogStore* pLogStore);
// int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore);
bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore);
// bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index);
int32_t (*syncLogRestoreFromSnapshot)(struct SSyncLogStore* pLogStore, SyncIndex index);
SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
......
......@@ -263,7 +263,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, ui
pColumnInfoData->varmeta.length = len + oldLen;
} else {
if (finalNumOfRows > *capacity) {
ASSERT(finalNumOfRows*pColumnInfoData->info.bytes);
ASSERT(finalNumOfRows * pColumnInfoData->info.bytes);
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
if (tmp == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
......@@ -293,7 +293,8 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, ui
return numOfRow1 + numOfRow2;
}
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo) {
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
const SDataBlockInfo* pBlockInfo) {
ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type);
if (numOfRows <= 0) {
return numOfRows;
......@@ -327,9 +328,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
return 0;
}
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
return taosArrayGetSize(pBlock->pDataBlock);
}
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
......@@ -396,7 +395,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
int32_t pageSize) {
ASSERT(pBlock != NULL && stopIndex != NULL);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
int32_t numOfRows = pBlock->info.rows;
int32_t bitmapChar = 1;
......@@ -512,7 +511,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
// write the number of rows
*(uint32_t*)buf = pBlock->info.rows;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
int32_t numOfRows = pBlock->info.rows;
char* pStart = buf + sizeof(uint32_t);
......@@ -542,7 +541,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
pBlock->info.rows = *(int32_t*)buf;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
const char* pStart = buf + sizeof(uint32_t);
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -734,7 +733,7 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock,
int32_t tupleIndex) {
int32_t code = 0;
size_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = &pDstCols[i];
......@@ -794,7 +793,7 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
int32_t rows = pDataBlock->info.rows;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
if (pCols == NULL) {
......@@ -902,7 +901,6 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
} else { // var data type
}
} else if (numOfCols == 2) {
}
}
......@@ -1103,14 +1101,14 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
}
}
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo *pBlockInfo, uint32_t numOfRows) {
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows) {
ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows);
if (numOfRows < pBlockInfo->capacity) {
return TSDB_CODE_SUCCESS;
}
// todo temp disable it
// ASSERT(pColumn->info.bytes != 0);
// ASSERT(pColumn->info.bytes != 0);
int32_t existedRows = pBlockInfo->rows;
......@@ -1141,7 +1139,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo *
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
pColumn->pData = tmp;
}
......@@ -1197,6 +1195,40 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
taosMemoryFreeClear(pBlock);
return NULL;
}
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
ASSERT(src != NULL);
dst->info = src->info;
dst->info.rows = 0;
dst->info.capacity = 0;
size_t numOfCols = taosArrayGetSize(src->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(src->pDataBlock, i);
SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
blockDataAppendColInfo(dst, &colInfo);
}
int32_t code = blockDataEnsureCapacity(dst, src->info.rows);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return -1;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
if (pSrc->pData == NULL) {
continue;
}
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
}
dst->info.rows = src->info.rows;
dst->info.capacity = src->info.rows;
return 0;
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
if (pDataBlock == NULL) {
......@@ -1272,7 +1304,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat
}
// todo disable it temporarily
// ASSERT(pColInfoData->info.type != 0);
// ASSERT(pColInfoData->info.type != 0);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pBlock->info.hasVarCol = true;
}
......@@ -1284,7 +1316,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) {
SColumnInfoData col = {.hasNull = true};
col.info.colId = colId;
col.info.type = type;
col.info.type = type;
col.info.bytes = bytes;
return col;
......@@ -1552,9 +1584,9 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
int32_t sz = taosArrayGetSize(dataBlocks);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i);
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
int32_t rows = pDataBlock->info.rows;
printf("%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId);
for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag);
......@@ -1633,8 +1665,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
// int32_t rowSize = pDataBlock->info.rowSize;
// int64_t groupId = pDataBlock->info.groupId;
// int32_t rowSize = pDataBlock->info.rowSize;
// int64_t groupId = pDataBlock->info.groupId;
if (colNum <= 1) {
// invalid if only with TS col
......
......@@ -17,7 +17,7 @@
#include "mmInt.h"
void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->grant);
mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->stb, &pInfo->grant);
}
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
......
......@@ -614,7 +614,7 @@ int64_t mndGenerateUid(char *name, int32_t len) {
}
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) {
SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
if (mndAcquireRpcRef(pMnode) != 0) return -1;
SSdb *pSdb = pMnode->pSdb;
......@@ -623,7 +623,9 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
pStbInfo->stbs == NULL) {
mndReleaseRpcRef(pMnode);
return -1;
}
......@@ -714,6 +716,27 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
sdbRelease(pSdb, pVgroup);
}
// stb info
pIter = NULL;
while (1) {
SStbObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
if (pIter == NULL) break;
SMonStbDesc desc = {0};
SName name1 = {0};
tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tNameGetDbName(&name1, desc.database_name);
SName name2 = {0};
tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);
taosArrayPush(pStbInfo->stbs, &desc);
sdbRelease(pSdb, pStb);
}
// grant info
pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
......
......@@ -403,7 +403,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.reader = pHandle->execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTsdbReader = 1,
// .initTsdbReader = 1,
};
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]);
......@@ -476,7 +476,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTsdbReader = 1,
// .initTsdbReader = 1,
};
/*pTask->exec.inputHandle = pStreamReader;*/
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
......
......@@ -66,6 +66,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp
if (qStreamScanSnapshot(task) < 0) {
ASSERT(0);
}
// set version
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
......
......@@ -44,6 +44,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
}
char* fname = buildFileName(pStore->pTq->path);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
taosMemoryFree(fname);
if (pFile != NULL) {
STqOffsetHead head = {0};
int64_t code;
......@@ -77,7 +78,6 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
}
taosCloseFile(&pFile);
taosMemoryFree(fname);
}
return pStore;
}
......@@ -102,6 +102,7 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) {
// TODO file name should be with a version
char* fname = buildFileName(pStore->pTq->path);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
taosMemoryFree(fname);
if (pFile == NULL) {
ASSERT(0);
return -1;
......@@ -140,6 +141,5 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) {
}
// close and rename file
taosCloseFile(&pFile);
taosMemoryFree(fname);
return 0;
}
......@@ -151,6 +151,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
int32_t sversion = htonl(pHandle->pBlock->sversion);
if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion ||
pHandle->cachedSchemaSuid != pHandle->msgIter.suid) {
if (pHandle->pSchema) taosMemoryFree(pHandle->pSchema);
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
if (pHandle->pSchema == NULL) {
tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table",
......@@ -161,6 +162,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
}
// this interface use suid instead of uid
if (pHandle->pSchemaWrapper) tDeleteSSchemaWrapper(pHandle->pSchemaWrapper);
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion, true);
if (pHandle->pSchemaWrapper == NULL) {
tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table",
......@@ -184,7 +186,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
while (colMeta < pSchemaWrapper->nCols) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
}
......@@ -207,7 +209,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
colNeed++;
} else {
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
}
......@@ -251,8 +253,8 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
}
return 0;
FAIL: // todo refactor here
// if (*ppCols) taosArrayDestroy(*ppCols);
FAIL: // todo refactor here
// if (*ppCols) taosArrayDestroy(*ppCols);
return -1;
}
......
......@@ -1341,7 +1341,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
}
if (rowRes != NULL) {
int32_t totalRows = pBlock->info.rows;
int32_t totalRows = pBlock->info.rows;
SSDataBlock* px = createOneDataBlock(pBlock, true);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
......@@ -4041,6 +4041,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
if(code){
pTaskInfo->code = code;
return NULL;
}
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
......@@ -4053,6 +4054,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
pTableListInfo->needSortTableByGroupId = true;
......@@ -4067,11 +4069,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp twSup = {
......@@ -4085,9 +4090,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
......@@ -4098,6 +4105,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
......@@ -4486,18 +4494,14 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->sql = sql;
(*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond;
(*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
&(*pTaskInfo)->tableqinfoList);
(*pTaskInfo)->pRoot =
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList);
if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code;
goto _complete;
}
if ((*pTaskInfo)->pRoot == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete;
}
return code;
_complete:
......
......@@ -1012,9 +1012,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
while (tqNextDataBlock(pInfo->streamBlockReader)) {
SSDataBlock block = {0};
uint64_t groupId = 0;
uint64_t uid = 0;
int32_t numOfRows = 0;
uint64_t groupId = 0;
uint64_t uid = 0;
int32_t numOfRows = 0;
// todo refactor
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader, &groupId, &uid, &numOfRows);
......@@ -1067,6 +1067,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
// TODO refactor @liao
taosArrayDestroy(block.pDataBlock);
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
pOperator->status = OP_EXEC_DONE;
......@@ -1106,12 +1109,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) {
SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp);
if (pResult) {
return pResult->info.rows > 0 ? pResult : NULL;
}
return NULL;
return pResult && pResult->info.rows > 0 ? pResult : NULL;
} else {
ASSERT(0);
return NULL;
......@@ -1387,7 +1389,8 @@ static SSDataBlock* buildSysTableMetaBlock() {
SSDataBlock* pBlock = createDataBlock();
for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
SColumnInfoData colInfoData = createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1);
SColumnInfoData colInfoData =
createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1);
blockDataAppendColInfo(pBlock, &colInfoData);
}
......
......@@ -79,7 +79,8 @@ static int32_t translateIn2NumOutDou(SFunctionNode* pFunc, char* pErrBuf, int32_
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_NUMERIC_TYPE(para1Type) || !IS_NUMERIC_TYPE(para2Type)) {
if ((!IS_NUMERIC_TYPE(para1Type) && !IS_NULL_TYPE(para1Type)) ||
(!IS_NUMERIC_TYPE(para2Type) && !IS_NULL_TYPE(para2Type))) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
......@@ -109,13 +110,13 @@ static int32_t translateLogarithm(SFunctionNode* pFunc, char* pErrBuf, int32_t l
}
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (!IS_NUMERIC_TYPE(para1Type)) {
if (!IS_NUMERIC_TYPE(para1Type) && !IS_NULL_TYPE(para1Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (2 == numOfParams) {
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_NUMERIC_TYPE(para2Type)) {
if (!IS_NUMERIC_TYPE(para2Type) && !IS_NULL_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
}
......
......@@ -280,6 +280,27 @@ static void monGenVgroupJson(SMonInfo *pMonitor) {
}
}
static void monGenStbJson(SMonInfo *pMonitor) {
SMonStbInfo *pInfo = &pMonitor->mmInfo.stb;
if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return;
SJson *pJson = tjsonAddArrayToObject(pMonitor->pJson, "stb_infos");
if (pJson == NULL) return;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->stbs); ++i) {
SJson *pStbJson = tjsonCreateObject();
if (pStbJson == NULL) continue;
if (tjsonAddItemToArray(pJson, pStbJson) != 0) {
tjsonDelete(pStbJson);
continue;
}
SMonStbDesc *pStbDesc = taosArrayGet(pInfo->stbs, i);
tjsonAddStringToObject(pStbJson, "stb_name", pStbDesc->stb_name);
tjsonAddStringToObject(pStbJson, "database_name", pStbDesc->database_name);
}
}
static void monGenGrantJson(SMonInfo *pMonitor) {
SMonGrantInfo *pInfo = &pMonitor->mmInfo.grant;
if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return;
......@@ -527,6 +548,7 @@ void monSendReport() {
monGenBasicJson(pMonitor);
monGenClusterJson(pMonitor);
monGenVgroupJson(pMonitor);
monGenStbJson(pMonitor);
monGenGrantJson(pMonitor);
monGenDnodeJson(pMonitor);
monGenDiskJson(pMonitor);
......
......@@ -209,6 +209,32 @@ int32_t tDecodeSMonVgroupInfo(SDecoder *decoder, SMonVgroupInfo *pInfo) {
return 0;
}
int32_t tEncodeSMonStbInfo(SEncoder *encoder, const SMonStbInfo *pInfo) {
if (tEncodeI32(encoder, taosArrayGetSize(pInfo->stbs)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->stbs); ++i) {
SMonStbDesc *pDesc = taosArrayGet(pInfo->stbs, i);
if (tEncodeCStr(encoder, pDesc->stb_name) < 0) return -1;
if (tEncodeCStr(encoder, pDesc->database_name) < 0) return -1;
}
return 0;
}
int32_t tDecodeSMonStbInfo(SDecoder *decoder, SMonStbInfo *pInfo) {
int32_t arraySize = 0;
if (tDecodeI32(decoder, &arraySize) < 0) return -1;
pInfo->stbs = taosArrayInit(arraySize, sizeof(SMonStbDesc));
if (pInfo->stbs == NULL) return -1;
for (int32_t i = 0; i < arraySize; ++i) {
SMonStbDesc desc = {0};
if (tDecodeCStrTo(decoder, desc.stb_name) < 0) return -1;
if (tDecodeCStrTo(decoder, desc.database_name) < 0) return -1;
taosArrayPush(pInfo->stbs, &desc);
}
return 0;
}
int32_t tEncodeSMonGrantInfo(SEncoder *encoder, const SMonGrantInfo *pInfo) {
if (tEncodeI32(encoder, pInfo->expire_time) < 0) return -1;
if (tEncodeI64(encoder, pInfo->timeseries_used) < 0) return -1;
......@@ -230,6 +256,7 @@ int32_t tSerializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) {
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeSMonClusterInfo(&encoder, &pInfo->cluster) < 0) return -1;
if (tEncodeSMonVgroupInfo(&encoder, &pInfo->vgroup) < 0) return -1;
if (tEncodeSMonStbInfo(&encoder, &pInfo->stb) < 0) return -1;
if (tEncodeSMonGrantInfo(&encoder, &pInfo->grant) < 0) return -1;
if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1;
if (tEncodeSMonLogs(&encoder, &pInfo->log) < 0) return -1;
......@@ -247,6 +274,7 @@ int32_t tDeserializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) {
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeSMonClusterInfo(&decoder, &pInfo->cluster) < 0) return -1;
if (tDecodeSMonVgroupInfo(&decoder, &pInfo->vgroup) < 0) return -1;
if (tDecodeSMonStbInfo(&decoder, &pInfo->stb) < 0) return -1;
if (tDecodeSMonGrantInfo(&decoder, &pInfo->grant) < 0) return -1;
if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1;
if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1;
......@@ -261,9 +289,11 @@ void tFreeSMonMmInfo(SMonMmInfo *pInfo) {
taosArrayDestroy(pInfo->cluster.mnodes);
taosArrayDestroy(pInfo->cluster.dnodes);
taosArrayDestroy(pInfo->vgroup.vgroups);
taosArrayDestroy(pInfo->stb.stbs);
pInfo->cluster.mnodes = NULL;
pInfo->cluster.dnodes = NULL;
pInfo->vgroup.vgroups = NULL;
pInfo->stb.stbs = NULL;
pInfo->log.logs = NULL;
}
......
......@@ -1061,6 +1061,7 @@ static EDealRes partTagsOptRebuildTbanmeImpl(SNode** pNode, void* pContext) {
}
strcpy(pFunc->functionName, "tbname");
pFunc->funcType = FUNCTION_TYPE_TBNAME;
pFunc->node.resType = ((SColumnNode*)*pNode)->node.resType;
nodesDestroyNode(*pNode);
*pNode = (SNode*)pFunc;
return DEAL_RES_IGNORE_CHILD;
......
......@@ -36,9 +36,6 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
SColumnInfoData *pOutputData = pOutput->columnData;
int32_t type = GET_PARAM_TYPE(pInput);
if (!IS_NUMERIC_TYPE(type)) {
return TSDB_CODE_FAILED;
}
switch (type) {
case TSDB_DATA_TYPE_FLOAT: {
......@@ -119,6 +116,13 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
break;
}
case TSDB_DATA_TYPE_NULL: {
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendNULL(pOutputData, i);
}
break;
}
default: {
colDataAssign(pOutputData, pInputData, pInput->numOfRows, NULL);
}
......@@ -130,9 +134,6 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
return TSDB_CODE_FAILED;
}
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
......@@ -142,7 +143,7 @@ static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SS
double *out = (double *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) {
if (colDataIsNull_s(pInputData, i) || IS_NULL_TYPE(type)) {
colDataAppendNULL(pOutputData, i);
continue;
}
......@@ -159,10 +160,6 @@ static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SS
}
static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) {
if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) {
return TSDB_CODE_FAILED;
}
SColumnInfoData *pInputData[2];
SColumnInfoData *pOutputData = pOutput->columnData;
_getDoubleValue_fn_t getValueFn[2];
......@@ -175,11 +172,15 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S
double *out = (double *)pOutputData->pData;
double result;
bool hasNullType = (IS_NULL_TYPE(GET_PARAM_TYPE(&pInput[0])) ||
IS_NULL_TYPE(GET_PARAM_TYPE(&pInput[1])));
int32_t numOfRows = TMAX(pInput[0].numOfRows, pInput[1].numOfRows);
if (pInput[0].numOfRows == pInput[1].numOfRows) {
for (int32_t i = 0; i < numOfRows; ++i) {
if (colDataIsNull_s(pInputData[0], i) ||
colDataIsNull_s(pInputData[1], i)) {
colDataIsNull_s(pInputData[1], i) ||
hasNullType) {
colDataAppendNULL(pOutputData, i);
continue;
}
......@@ -191,7 +192,7 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S
}
}
} else if (pInput[0].numOfRows == 1) { //left operand is constant
if (colDataIsNull_s(pInputData[0], 0)) {
if (colDataIsNull_s(pInputData[0], 0) || hasNullType) {
colDataAppendNNULL(pOutputData, 0, pInput[1].numOfRows);
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
......@@ -210,7 +211,7 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S
}
}
} else if (pInput[1].numOfRows == 1) {
if (colDataIsNull_s(pInputData[1], 0)) {
if (colDataIsNull_s(pInputData[1], 0) || hasNullType) {
colDataAppendNNULL(pOutputData, 0, pInput[0].numOfRows);
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
......@@ -236,9 +237,6 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S
static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) {
int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
return TSDB_CODE_FAILED;
}
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
......@@ -272,6 +270,13 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
break;
}
case TSDB_DATA_TYPE_NULL: {
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendNULL(pOutputData, i);
}
break;
}
default: {
colDataAssign(pOutputData, pInputData, pInput->numOfRows, NULL);
}
......
......@@ -34,7 +34,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet);
int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet);
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock);
......
......@@ -172,7 +172,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows);
int32_t numOfCols = (int32_t) taosArrayGetSize(pBlock->pDataBlock);
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = 0;
......@@ -185,7 +185,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
return 0;
}
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
void* buf = NULL;
int32_t code = -1;
int32_t blockNum = taosArrayGetSize(data->blocks);
......@@ -307,6 +307,8 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return -1;
}
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(pBlock);
tmsgSendReq(pEpSet, &dispatchMsg);
return 0;
......
......@@ -53,9 +53,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
}
// TODO: do we need free memory?
SSDataBlock* outputCopy = createOneDataBlock(output, true);
outputCopy->info.childId = pTask->selfChildId;
taosArrayPush(pRes, outputCopy);
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block);
/*SSDataBlock* outputCopy = createOneDataBlock(output, true);*/
/*outputCopy->info.childId = pTask->selfChildId;*/
/*taosArrayPush(pRes, outputCopy);*/
}
return 0;
}
......@@ -68,6 +72,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
streamTaskExecImpl(pTask, data, pRes);
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
return NULL;
}
......@@ -82,25 +87,25 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) {
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(qRes);
return NULL;
}
int8_t type = ((SStreamQueueItem*)data)->type;
if (type == STREAM_INPUT__TRIGGER) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
}
streamQueueProcessSuccess(pTask->inputQueue);
return taosArrayInit(0, sizeof(SSDataBlock));
pRes = taosArrayInit(0, sizeof(SSDataBlock));
}
int8_t type = ((SStreamQueueItem*)data)->type;
if (type == STREAM_INPUT__TRIGGER) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
}
}
return pRes;
......@@ -125,14 +130,14 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
taosArrayDestroy(pRes);
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
return 0;
} else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
continue;
} else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
ASSERT(taosArrayGetSize(pRes) == 0);
taosArrayDestroy(pRes);
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
return 0;
} else {
ASSERT(0);
......
......@@ -28,13 +28,13 @@ extern "C" {
#include "trpc.h"
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 500
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 1300
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 100
#define HEARTBEAT_TIMER_MS 900
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
......
......@@ -26,11 +26,13 @@ extern "C" {
#include "syncInt.h"
#include "syncRaftEntry.h"
#include "taosdef.h"
#include "wal.h"
typedef struct SSyncLogStoreData {
SSyncNode* pSyncNode;
SWal* pWal;
SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
SSyncNode* pSyncNode;
SWal* pWal;
SWalReadHandle* pWalHandle;
// SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
} SSyncLogStoreData;
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
......
......@@ -420,44 +420,26 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// prevLogIndex == -1
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
if (gRaftDetailLog) {
sTrace("syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld", pMsg->prevLogIndex);
}
return true;
}
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
if (pMsg->prevLogIndex > myLastIndex) {
if (gRaftDetailLog) {
sTrace("syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld", pMsg->prevLogIndex,
myLastIndex);
}
sDebug("vgId:%d sync log not ok, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
if (myPreLogTerm == SYNC_TERM_INVALID) {
sError("vgId:%d sync get pre term error, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
sDebug("vgId:%d sync log not ok2, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
if (gRaftDetailLog) {
sTrace(
"syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
"myPreLogTerm:%lu",
pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm);
}
return true;
}
if (gRaftDetailLog) {
sTrace(
"syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
"myPreLogTerm:%lu",
pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm);
}
sDebug("vgId:%d sync log not ok3, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
......@@ -466,14 +448,11 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
int32_t code = 0;
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, vgId:%d, term:%lu", ths->vgId,
ths->pRaftStore->currentTerm);
syncAppendEntriesLog2(logBuf, pMsg);
syncAppendEntriesLog2("==syncNodeOnAppendEntriesSnapshotCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
sInfo("recv SyncAppendEntries maybe replica already dropped");
syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped");
return ret;
}
......@@ -497,7 +476,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
do {
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
if (condition) {
sTrace("recv SyncAppendEntries, candidate to follower");
syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower");
syncNodeBecomeFollower(ths, "from candidate by append entries");
// do not reply?
......@@ -505,6 +484,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
}
} while (0);
#if 0
// fake match
//
// condition1:
......@@ -530,7 +510,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
syncNodeHasSnapshot(ths);
bool condition1 =
condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex);
condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex); // donot use syncLogEntryCount!!! use isEmpty
bool condition2 = condition0 && (ths->pLogStore->syncLogLastIndex(ths->pLogStore) <= snapshot.lastApplyIndex) &&
(pMsg->prevLogIndex > myLastIndex);
bool condition3 = condition0 && (pMsg->prevLogIndex < snapshot.lastApplyIndex);
......@@ -538,11 +518,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = condition1 || condition2 || condition3 || condition4;
if (condition) {
sTrace(
"recv SyncAppendEntries, fake match, myLastIndex:%ld, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, "
"condition1:%d, condition2:%d, condition3:%d, condition4:%d",
myLastIndex, ths->pLogStore->syncLogBeginIndex(ths->pLogStore),
ths->pLogStore->syncLogEndIndex(ths->pLogStore), condition1, condition2, condition3, condition4);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, fake match, pre-index:%ld, pre-term:%lu",
pMsg->prevLogIndex, pMsg->prevLogTerm);
syncNodeEventLog(ths, logBuf);
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
......@@ -562,6 +541,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
return ret;
}
} while (0);
#endif
// fake match2
//
......@@ -576,8 +556,13 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex);
if (condition) {
sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex,
ths->commitIndex);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, fake match2, pre-index:%ld, pre-term:%lu, datalen:%d", pMsg->prevLogIndex,
pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
SyncIndex matchIndex = ths->commitIndex;
bool hasAppendEntries = pMsg->dataLen > 0;
......@@ -605,6 +590,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
code = syncNodePreCommit(ths, pAppendEntry);
ASSERT(code == 0);
// update match index
matchIndex = pMsg->prevLogIndex + 1;
syncEntryDestory(pAppendEntry);
......@@ -650,11 +636,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = condition1 || condition2;
if (condition) {
sTrace(
"recv SyncAppendEntries, not match, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, condition1:%d, "
"condition2:%d, logOK:%d",
ths->pLogStore->syncLogBeginIndex(ths->pLogStore), ths->pLogStore->syncLogEndIndex(ths->pLogStore),
condition1, condition2, logOK);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, not match, pre-index:%ld, pre-term:%lu, datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
......@@ -693,8 +678,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
sTrace("recv SyncAppendEntries, match, myLastIndex:%ld, hasExtraEntries:%d, hasAppendEntries:%d", myLastIndex,
hasExtraEntries, hasAppendEntries);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, match, pre-index:%ld, pre-term:%lu, datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
if (hasExtraEntries) {
// make log same, rollback deleted entries
......
......@@ -101,32 +101,27 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
int32_t ret = 0;
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, vgId:%d, term:%lu", ths->vgId,
ths->pRaftStore->currentTerm);
syncAppendEntriesReplyLog2(logBuf, pMsg);
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplySnapshotCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
sInfo("recv SyncAppendEntriesReply, maybe replica already dropped");
return ret;
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
return 0;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("recv SyncAppendEntriesReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term,
ths->pRaftStore->currentTerm);
return ret;
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%lu, drop stale response", pMsg->term);
syncNodeEventLog(ths, logBuf);
return 0;
}
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
if (gRaftDetailLog) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
syncNodeEventLog(ths, "recv sync-append-entries-reply, before");
}
syncIndexMgrLog2("recv sync-append-entries-reply, before pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv sync-append-entries-reply, before pMatchIndex:", ths->pMatchIndex);
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
......@@ -134,12 +129,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, error term, receive_term:%lu current_term:%lu",
pMsg->term, ths->pRaftStore->currentTerm);
syncNodeLog2(logBuf, ths);
sError("%s", logBuf);
return ret;
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term);
syncNodeErrorLog(ths, logBuf);
return -1;
}
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
......@@ -228,8 +221,11 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
}
}
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
if (gRaftDetailLog) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, after");
}
syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
return ret;
}
\ No newline at end of file
......@@ -416,7 +416,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
}
}
sMeta->lastConfigIndex = lastIndex;
sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lastConfigIndex:%" PRId64, pSyncNode->vgId, snapshotIndex,
sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
sMeta->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
......@@ -433,8 +433,9 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
}
}
sTrace("vgId:%d, sync get snapshot last config index, index:%ld lcindex:%ld", pSyncNode->vgId, snapshotLastApplyIndex,
lastIndex);
sTrace("sync syncNodeGetSnapshotConfigIndex index:%ld lastConfigIndex:%ld", snapshotLastApplyIndex, lastIndex);
return lastIndex;
}
......@@ -1310,6 +1311,10 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
char* printStr = "";
if (pCfgStr != NULL) {
printStr = pCfgStr;
}
if (userStrLen < 256) {
char logBuf[256 + 256];
......@@ -1321,7 +1326,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pCfgStr);
pSyncNode->changing, printStr);
} else {
snprintf(logBuf, sizeof(logBuf), "%s", str);
}
......@@ -1338,7 +1343,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pCfgStr);
pSyncNode->changing, printStr);
} else {
snprintf(s, len, "%s", str);
}
......@@ -1957,17 +1962,21 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
taosMemoryFree(pPreEntry);
return preTerm;
} else {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex == preIndex) {
return snapshot.lastApplyTerm;
}
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex == preIndex) {
return snapshot.lastApplyTerm;
}
}
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "sync node get pre term error, index:%ld", index);
syncNodeErrorLog(pSyncNode, logBuf);
} while (0);
return SYNC_TERM_INVALID;
}
......
......@@ -107,26 +107,30 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
}
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
int32_t len = 512;
char *s = taosMemoryMalloc(len);
memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
char *p = s + strlen(s);
for (int i = 0; i < pSyncCfg->replicaNum; ++i) {
/*
if (p + 128 + 32 > s + len) {
break;
if (pSyncCfg != NULL) {
int32_t len = 512;
char *s = taosMemoryMalloc(len);
memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
char *p = s + strlen(s);
for (int i = 0; i < pSyncCfg->replicaNum; ++i) {
/*
if (p + 128 + 32 > s + len) {
break;
}
*/
char buf[128 + 32];
snprintf(buf, sizeof(buf), "%s:%d, ", pSyncCfg->nodeInfo[i].nodeFqdn, pSyncCfg->nodeInfo[i].nodePort);
strncpy(p, buf, sizeof(buf));
p = s + strlen(s);
}
*/
char buf[128 + 32];
snprintf(buf, sizeof(buf), "%s:%d, ", pSyncCfg->nodeInfo[i].nodeFqdn, pSyncCfg->nodeInfo[i].nodePort);
strncpy(p, buf, sizeof(buf));
p = s + strlen(s);
strcpy(p - 2, "}");
return s;
}
strcpy(p - 2, "}");
return s;
return NULL;
}
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
......
......@@ -16,10 +16,10 @@
#include "syncRaftLog.h"
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "wal.h"
// refactor, log[0 .. n] ==> log[m .. n]
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
// static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore);
......@@ -45,31 +45,43 @@ static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncI
static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
// refactor, log[0 .. n] ==> log[m .. n]
/*
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) {
sTrace("raftLogSetBeginIndex beginIndex:%ld", beginIndex);
// if beginIndex == 0, donot need call this funciton
ASSERT(beginIndex > 0);
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
sTrace("vgId:%d, reset wal begin index:%ld", pData->pSyncNode->vgId, beginIndex);
pData->beginIndex = beginIndex;
walRestoreFromSnapshot(pWal, beginIndex - 1);
return 0;
}
*/
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) {
ASSERT(snapshotIndex >= 0);
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walRestoreFromSnapshot(pWal, snapshotIndex);
return 0;
}
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
return pData->beginIndex;
SyncIndex firstVer = walGetFirstVer(pWal);
return firstVer;
}
static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore) { return raftLogLastIndex(pLogStore); }
static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) {
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
return (endIndex < beginIndex);
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
return walIsEmpty(pWal);
}
static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
......@@ -96,23 +108,8 @@ static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastVer = walGetLastVer(pWal);
SyncIndex firstVer = walGetFirstVer(pWal);
if (lastVer < firstVer) {
// no record
lastIndex = -1;
} else {
if (firstVer >= 0) {
lastIndex = lastVer;
} else if (firstVer == -1) {
lastIndex = -1;
} else {
ASSERT(0);
}
}
return lastIndex;
return lastVer;
}
static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
......@@ -122,6 +119,26 @@ static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
return lastVer + 1;
}
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
if (walIsEmpty(pWal)) {
return 0;
} else {
SSyncRaftEntry* pLastEntry;
int32_t code = raftLogGetLastEntry(pLogStore, &pLastEntry);
ASSERT(code == 0);
ASSERT(pLastEntry != NULL);
SyncTerm lastTerm = pLastEntry->term;
taosMemoryFree(pLastEntry);
return lastTerm;
}
return 0;
}
/*
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
SyncTerm lastTerm = 0;
if (raftLogEntryCount(pLogStore) == 0) {
......@@ -137,6 +154,7 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
}
return lastTerm;
}
*/
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
......@@ -160,8 +178,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
sError("vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
pEntry->index, err, err, errStr, sysErr, sysErrStr);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
}
......@@ -229,7 +251,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
*ppEntry = NULL;
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle;
if (pWalHandle == NULL) {
return -1;
}
......@@ -240,12 +263,23 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
sError("vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, index,
err, err, errStr, sysErr, sysErrStr);
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, err,
err, errStr, sysErr, sysErrStr);
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
syncNodeEventLog(pData->pSyncNode, logBuf);
} else {
syncNodeErrorLog(pData->pSyncNode, logBuf);
}
} while (0);
/*
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
*/
return code;
}
......@@ -261,9 +295,11 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
/*
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
*/
return code;
}
......@@ -285,6 +321,25 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
return code;
}
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
ASSERT(ppLastEntry != NULL);
*ppLastEntry = NULL;
if (walIsEmpty(pWal)) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
} else {
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
return code;
}
return -1;
}
/*
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
*ppLastEntry = NULL;
if (raftLogEntryCount(pLogStore) == 0) {
......@@ -294,6 +349,7 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
return code;
}
*/
//-------------------------------
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
......@@ -306,16 +362,22 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode;
pData->pWal = pSyncNode->pWal;
ASSERT(pData->pWal != NULL);
SyncIndex firstVer = walGetFirstVer(pData->pWal);
SyncIndex lastVer = walGetLastVer(pData->pWal);
if (firstVer >= 0) {
pData->beginIndex = firstVer;
} else if (firstVer == -1) {
pData->beginIndex = lastVer + 1;
} else {
ASSERT(0);
}
pData->pWalHandle = walOpenReadHandle(pData->pWal);
ASSERT(pData->pWalHandle != NULL);
/*
SyncIndex firstVer = walGetFirstVer(pData->pWal);
SyncIndex lastVer = walGetLastVer(pData->pWal);
if (firstVer >= 0) {
pData->beginIndex = firstVer;
} else if (firstVer == -1) {
pData->beginIndex = lastVer + 1;
} else {
ASSERT(0);
}
*/
pLogStore->appendEntry = logStoreAppendEntry;
pLogStore->getEntry = logStoreGetEntry;
......@@ -325,7 +387,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
pLogStore->getCommitIndex = logStoreGetCommitIndex;
pLogStore->syncLogSetBeginIndex = raftLogSetBeginIndex;
// pLogStore->syncLogSetBeginIndex = raftLogSetBeginIndex;
pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot;
pLogStore->syncLogBeginIndex = raftLogBeginIndex;
pLogStore->syncLogEndIndex = raftLogEndIndex;
pLogStore->syncLogIsEmpty = raftLogIsEmpty;
......@@ -344,6 +407,11 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
void logStoreDestory(SSyncLogStore* pLogStore) {
if (pLogStore != NULL) {
SSyncLogStoreData* pData = pLogStore->data;
if (pData->pWalHandle != NULL) {
walCloseReadHandle(pData->pWalHandle);
}
taosMemoryFree(pLogStore->data);
taosMemoryFree(pLogStore);
}
......@@ -368,8 +436,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
sError("vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
pEntry->index, err, err, errStr, sysErr, sysErrStr);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
}
......@@ -389,7 +460,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SWal* pWal = pData->pWal;
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle;
ASSERT(pWalHandle != NULL);
int32_t code = walReadWithHandle(pWalHandle, index);
......@@ -398,12 +470,20 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
sError("vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
index, err, err, errStr, sysErr, sysErrStr);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index,
err, err, errStr, sysErr, sysErrStr);
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
syncNodeEventLog(pData->pSyncNode, logBuf);
} else {
syncNodeErrorLog(pData->pSyncNode, logBuf);
}
} while (0);
ASSERT(0);
}
// ASSERT(walReadWithHandle(pWalHandle, index) == 0);
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
ASSERT(pEntry != NULL);
......@@ -417,9 +497,11 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
ASSERT(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
/*
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
*/
return pEntry;
......@@ -498,6 +580,7 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
return pEntry;
}
/*
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
char u64buf[128] = {0};
SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
......@@ -544,6 +627,57 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
return pJson;
}
*/
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
char u64buf[128] = {0};
SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
cJSON* pRoot = cJSON_CreateObject();
if (pData != NULL && pData->pWal != NULL) {
snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
snprintf(u64buf, sizeof(u64buf), "%ld", beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
snprintf(u64buf, sizeof(u64buf), "%ld", endIndex);
cJSON_AddStringToObject(pRoot, "endIndex", u64buf);
int32_t count = raftLogEntryCount(pLogStore);
cJSON_AddNumberToObject(pRoot, "entryCount", count);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogWriteIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "WriteIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%d", raftLogIsEmpty(pLogStore));
cJSON_AddStringToObject(pRoot, "IsEmpty", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
cJSON* pEntries = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
if (!raftLogIsEmpty(pLogStore)) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
syncEntryDestory(pEntry);
}
}
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
return pJson;
}
char* logStore2Str(SSyncLogStore* pLogStore) {
cJSON* pJson = logStore2Json(pLogStore);
......@@ -563,7 +697,8 @@ cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
snprintf(u64buf, sizeof(u64buf), "%ld", beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
......
......@@ -219,6 +219,17 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
int32_t ret = 0;
do {
char host[128];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
sDebug(
"vgId:%d, send sync-append-entries to %s:%d, term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, commit:%ld, "
"datalen:%d",
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
pMsg->commitIndex, pMsg->dataLen);
} while (0);
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
......
......@@ -545,7 +545,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
}
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
// pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, pMsg->lastIndex);
// maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
......
......@@ -113,7 +113,8 @@ void test2() {
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore);
if (gAssert) {
......@@ -228,7 +229,8 @@ void test4() {
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
for (int i = 5; i <= 9; ++i) {
int32_t dataLen = 10;
......@@ -289,7 +291,8 @@ void test5() {
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
for (int i = 5; i <= 9; ++i) {
int32_t dataLen = 10;
......@@ -363,7 +366,8 @@ void test6() {
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest6 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
// pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
for (int i = 5; i <= 9; ++i) {
int32_t dataLen = 10;
......@@ -405,14 +409,32 @@ void test6() {
assert(pLogStore->syncLogLastTerm(pLogStore) == 0);
}
do {
SyncIndex firstVer = walGetFirstVer(pWal);
SyncIndex lastVer = walGetLastVer(pWal);
bool isEmpty = walIsEmpty(pWal);
printf("before -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty);
} while (0);
logStoreDestory(pLogStore);
cleanup();
// restart
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
do {
SyncIndex firstVer = walGetFirstVer(pWal);
SyncIndex lastVer = walGetLastVer(pWal);
bool isEmpty = walIsEmpty(pWal);
printf("after -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty);
} while (0);
logStoreLog2((char*)"\n\n\ntest6 restart ----- ", pLogStore);
if (gAssert) {
......@@ -432,17 +454,20 @@ void test6() {
int main(int argc, char** argv) {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE;
gRaftDetailLog = true;
if (argc == 2) {
gAssert = atoi(argv[1]);
}
sTrace("gAssert : %d", gAssert);
/*
test1();
test2();
test3();
test4();
test5();
*/
test6();
return 0;
......
......@@ -312,7 +312,8 @@ void test5() {
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6);
//pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6);
pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, 5);
for (int i = 6; i <= 10; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
......@@ -372,6 +373,7 @@ void test5() {
int main(int argc, char** argv) {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE;
gRaftDetailLog = true;
if (argc == 2) {
gAssert = atoi(argv[1]);
......
......@@ -272,14 +272,23 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
}
}
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pRead->pWal->vers.firstVer,
pRead->pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else
else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
}
return -1;
}
......@@ -304,8 +313,10 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
pRead->pHead->head.bodyLen) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else
else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
}
return -1;
}
......
......@@ -150,6 +150,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(code == 0);
if (code != 0) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
if (head.head.version != ver) {
......
......@@ -21,45 +21,39 @@ class ClusterDnodes(TDDnodes):
self.testCluster = False
self.valgrind = 0
self.killValgrind = 1
self.independent = True
self.dnodeNums = 5
# def getTDDnodes(dnodeNums):
# return
class ConfigureyCluster:
"""configure dnodes and return TDDnodes list, it can """
"""This will create defined number of dnodes and create a cluset.
at the same time, it will return TDDnodes list: dnodes, """
hostname= socket.gethostname()
def __init__(self):
self.dnodes = None
self.dnodes_nums = 5
self.dnodes = []
self.dnodeNums = 5
self.independent = True
self.start_port = 6030
self.startPort = 6030
self.portStep = 100
hostname1= socket.gethostname()
self.mnodeNums = 0
def configure_cluster(self ,dnodes_nums=5,independent=True,start_port=6030,portStep=100,hostname="%s"%hostname1):
self.start_port=int(start_port)
def configure_cluster(self ,dnodeNums=5,mnodeNums=0,startPort=6030,portStep=100,hostname="%s"%hostname):
self.startPort=int(startPort)
self.portStep=int(portStep)
self.hostname=hostname
self.dnodes_nums = int(dnodes_nums)
self.independent = independent
self.dnodeNums = int(dnodeNums)
self.mnodeNums = int(mnodeNums)
self.dnodes = []
start_port_sec = 6130
for num in range(1, (self.dnodes_nums+1)):
startPort_sec = int(startPort+portStep)
for num in range(1, (self.dnodeNums+1)):
dnode = TDDnode(num)
dnode.addExtraCfg("firstEp", f"{hostname}:{self.start_port}")
dnode.addExtraCfg("firstEp", f"{hostname}:{self.startPort}")
dnode.addExtraCfg("fqdn", f"{hostname}")
dnode.addExtraCfg("serverPort", f"{self.start_port + (num-1)*self.portStep}")
# dnode.addExtraCfg("monitorFqdn", hostname)
# dnode.addExtraCfg("monitorPort", 7043)
dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}")
# configure three dnoe don't support vnodes
if self.dnodes_nums > 4 :
if self.independent and (num < 4):
dnode.addExtraCfg("supportVnodes", 0)
dnode.addExtraCfg("serverPort", f"{self.startPort + (num-1)*self.portStep}")
dnode.addExtraCfg("secondEp", f"{hostname}:{startPort_sec}")
# configure dnoe of independent mnodes
if num <= self.mnodeNums and self.mnodeNums != 0 :
dnode.addExtraCfg("supportVnodes", 0)
# print(dnode)
self.dnodes.append(dnode)
return self.dnodes
......@@ -69,24 +63,7 @@ class ConfigureyCluster:
for dnode in self.dnodes[1:]:
# print(dnode.cfgDict)
dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"]
print(dnode_id)
tdSql.execute(" create dnode '%s';"%dnode_id)
# count=0
# while count < 10:
# time.sleep(1)
# tdSql.query("show dnodes;")
# if tdSql.checkRows(self.dnodes_nums) :
# print("mnode is three nodes")
# if tdSql.queryResult[0][4]=='leader' :
# if tdSql.queryResult[2][4]=='offline':
# if tdSql.queryResult[1][2]=='follower':
# print("stop mnodes on dnode 3 successfully in 10s")
# break
# count+=1
# else:
# print("stop mnodes on dnode 3 failed in 10s")
# return -1
checkstatus=False
......@@ -97,206 +74,19 @@ class ConfigureyCluster:
tdSql.query("show dnodes")
# tdLog.debug(tdSql.queryResult)
status=0
for i in range(self.dnodes_nums):
for i in range(self.dnodeNums):
if tdSql.queryResult[i][4] == "ready":
status+=1
tdLog.debug(status)
# tdLog.debug(status)
if status == self.dnodes_nums:
tdLog.debug(" create cluster with %d dnode and check cluster dnode all ready within 5s! " %self.dnodes_nums)
if status == self.dnodeNums:
tdLog.debug(" create cluster with %d dnode and check cluster dnode all ready within 5s! " %self.dnodeNums)
break
count+=1
time.sleep(1)
else:
tdLog.debug("create cluster with %d dnode but check dnode not ready within 5s ! "%self.dnodes_nums)
tdLog.debug("create cluster with %d dnode but check dnode not ready within 5s ! "%self.dnodeNums)
return -1
cluster = ConfigureyCluster()
# def start(self ,dnodes_nums):
# self.TDDnodes = ClusterDnodes(dnodes)
# self.TDDnodes.init("")
# self.TDDnodes.setTestCluster(testCluster)
# self.TDDnodes.setValgrind(valgrind)
# self.TDDnodes.stopAll()
# for dnode in self.TDDnodes.dnodes:
# self.TDDnodes.deploy(dnode.index,{})
# for dnode in self.TDDnodes.dnodes:
# self.TDDnodes.starttaosd(dnode.index)
# # create cluster
# for dnode in self.TDDnodes.dnodes[1:]:
# # print(dnode.cfgDict)
# dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"]
# dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0]
# dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
# cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;"
# print(cmd)
# os.system(cmd)
# time.sleep(2)
# tdLog.info(" create cluster with %d dnode done! " %dnodes_nums)
# def buildcluster(self,dnodenumber):
# self.depoly_cluster(dnodenumber)
# self.master_dnode = self.TDDnodes.dnodes[0]
# self.host=self.master_dnode.cfgDict["fqdn"]
# conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
# tdSql.init(conn1.cursor())
# def checkdnodes(self,dnodenumber):
# count=0
# while count < 10:
# time.sleep(1)
# statusReadyBumber=0
# tdSql.query("show dnodes;")
# if tdSql.checkRows(dnodenumber) :
# print("dnode is %d nodes"%dnodenumber)
# for i in range(dnodenumber):
# if tdSql.queryResult[i][4] !='ready' :
# status=tdSql.queryResult[i][4]
# print("dnode:%d status is %s "%(i,status))
# break
# else:
# statusReadyBumber+=1
# print(statusReadyBumber)
# if statusReadyBumber == dnodenumber :
# print("all of %d mnodes is ready in 10s "%dnodenumber)
# return True
# break
# count+=1
# else:
# print("%d mnodes is not ready in 10s "%dnodenumber)
# return False
# def check3mnode(self):
# count=0
# while count < 10:
# time.sleep(1)
# tdSql.query("show mnodes;")
# if tdSql.checkRows(3) :
# print("mnode is three nodes")
# if tdSql.queryResult[0][2]=='leader' :
# if tdSql.queryResult[1][2]=='follower':
# if tdSql.queryResult[2][2]=='follower':
# print("three mnodes is ready in 10s")
# break
# elif tdSql.queryResult[0][2]=='follower' :
# if tdSql.queryResult[1][2]=='leader':
# if tdSql.queryResult[2][2]=='follower':
# print("three mnodes is ready in 10s")
# break
# elif tdSql.queryResult[0][2]=='follower' :
# if tdSql.queryResult[1][2]=='follower':
# if tdSql.queryResult[2][2]=='leader':
# print("three mnodes is ready in 10s")
# break
# count+=1
# else:
# print("three mnodes is not ready in 10s ")
# return -1
# tdSql.query("show mnodes;")
# tdSql.checkRows(3)
# tdSql.checkData(0,1,'%s:6030'%self.host)
# tdSql.checkData(0,3,'ready')
# tdSql.checkData(1,1,'%s:6130'%self.host)
# tdSql.checkData(1,3,'ready')
# tdSql.checkData(2,1,'%s:6230'%self.host)
# tdSql.checkData(2,3,'ready')
# def check3mnode1off(self):
# count=0
# while count < 10:
# time.sleep(1)
# tdSql.query("show mnodes;")
# if tdSql.checkRows(3) :
# print("mnode is three nodes")
# if tdSql.queryResult[0][2]=='offline' :
# if tdSql.queryResult[1][2]=='leader':
# if tdSql.queryResult[2][2]=='follower':
# print("stop mnodes on dnode 2 successfully in 10s")
# break
# elif tdSql.queryResult[1][2]=='follower':
# if tdSql.queryResult[2][2]=='leader':
# print("stop mnodes on dnode 2 successfully in 10s")
# break
# count+=1
# else:
# print("stop mnodes on dnode 2 failed in 10s ")
# return -1
# tdSql.error("drop mnode on dnode 1;")
# tdSql.query("show mnodes;")
# tdSql.checkRows(3)
# tdSql.checkData(0,1,'%s:6030'%self.host)
# tdSql.checkData(0,2,'offline')
# tdSql.checkData(0,3,'ready')
# tdSql.checkData(1,1,'%s:6130'%self.host)
# tdSql.checkData(1,3,'ready')
# tdSql.checkData(2,1,'%s:6230'%self.host)
# tdSql.checkData(2,3,'ready')
# def check3mnode2off(self):
# count=0
# while count < 40:
# time.sleep(1)
# tdSql.query("show mnodes;")
# if tdSql.checkRows(3) :
# print("mnode is three nodes")
# if tdSql.queryResult[0][2]=='leader' :
# if tdSql.queryResult[1][2]=='offline':
# if tdSql.queryResult[2][2]=='follower':
# print("stop mnodes on dnode 2 successfully in 10s")
# break
# count+=1
# else:
# print("stop mnodes on dnode 2 failed in 10s ")
# return -1
# tdSql.error("drop mnode on dnode 2;")
# tdSql.query("show mnodes;")
# tdSql.checkRows(3)
# tdSql.checkData(0,1,'%s:6030'%self.host)
# tdSql.checkData(0,2,'leader')
# tdSql.checkData(0,3,'ready')
# tdSql.checkData(1,1,'%s:6130'%self.host)
# tdSql.checkData(1,2,'offline')
# tdSql.checkData(1,3,'ready')
# tdSql.checkData(2,1,'%s:6230'%self.host)
# tdSql.checkData(2,2,'follower')
# tdSql.checkData(2,3,'ready')
# def check3mnode3off(self):
# count=0
# while count < 10:
# time.sleep(1)
# tdSql.query("show mnodes;")
# if tdSql.checkRows(3) :
# print("mnode is three nodes")
# if tdSql.queryResult[0][2]=='leader' :
# if tdSql.queryResult[2][2]=='offline':
# if tdSql.queryResult[1][2]=='follower':
# print("stop mnodes on dnode 3 successfully in 10s")
# break
# count+=1
# else:
# print("stop mnodes on dnode 3 failed in 10s")
# return -1
# tdSql.error("drop mnode on dnode 3;")
# tdSql.query("show mnodes;")
# tdSql.checkRows(3)
# tdSql.checkData(0,1,'%s:6030'%self.host)
# tdSql.checkData(0,2,'leader')
# tdSql.checkData(0,3,'ready')
# tdSql.checkData(1,1,'%s:6130'%self.host)
# tdSql.checkData(1,2,'follower')
# tdSql.checkData(1,3,'ready')
# tdSql.checkData(2,1,'%s:6230'%self.host)
# tdSql.checkData(2,2,'offline')
# tdSql.checkData(2,3,'ready')
cluster = ConfigureyCluster()
\ No newline at end of file
......@@ -12,6 +12,8 @@ sql connect
print =============== show dnodes
sleep 2000
sql create database db vgroups 2;
sql use db;
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd";
sleep 2000
print =============== create drop qnode 1
......
......@@ -159,13 +159,10 @@ class TDTestCase:
tdSql.query("show mnodes;")
tdSql.checkRows(3)
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(0,2,'leader')
tdSql.checkData(0,3,'ready')
tdSql.checkData(1,1,'%s:6130'%self.host)
tdSql.checkData(1,2,'offline')
tdSql.checkData(1,3,'ready')
tdSql.checkData(2,1,'%s:6230'%self.host)
tdSql.checkData(2,2,'follower')
tdSql.checkData(2,3,'ready')
def check3mnode3off(self):
......@@ -265,13 +262,6 @@ class TDTestCase:
while stopcount <= 2:
for i in range(dnodenumber):
tdDnodes[i].stoptaosd()
# if i == 1 :
# self.check3mnode2off()
# elif i == 2 :
# self.check3mnode3off()
# elif i == 0:
# self.check3mnode1off()
tdDnodes[i].starttaosd()
# self.check3mnode()
stopcount+=1
......
from ntpath import join
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from util.cluster import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.dnodes = 5
self.mnodes = 3
self.idIndex = 0
self.roleIndex = 2
self.mnodeStatusIndex = 3
self.mnodeEpIndex = 1
self.dnodeStatusIndex = 4
self.mnodeCheckCnt = 10
self.host = socket.gethostname()
self.startPort = 6030
self.portStep = 100
self.dnodeOfLeader = 0
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkDnodesStatusAndCreateMnode(self,dnodeNumbers):
count=0
while count < dnodeNumbers:
tdSql.query("show dnodes")
# tdLog.debug(tdSql.queryResult)
dCnt = 0
for i in range(dnodeNumbers):
if tdSql.queryResult[i][self.dnodeStatusIndex] != "ready":
break
else:
dCnt += 1
if dCnt == dnodeNumbers:
break
time.sleep(1)
tdLog.debug("............... waiting for all dnodes ready!")
tdLog.info("==============create two new mnodes ========")
tdSql.execute("create mnode on dnode 2")
tdSql.execute("create mnode on dnode 3")
self.check3mnode()
return
def check3mnode(self):
count=0
while count < self.mnodeCheckCnt:
time.sleep(1)
tdSql.query("show mnodes;")
if tdSql.checkRows(self.mnodes) :
tdLog.debug("mnode is three nodes")
else:
tdLog.exit("mnode number is correct")
roleOfMnode0 = tdSql.queryResult[0][self.roleIndex]
roleOfMnode1 = tdSql.queryResult[1][self.roleIndex]
roleOfMnode2 = tdSql.queryResult[2][self.roleIndex]
if roleOfMnode0=='leader' and roleOfMnode1=='follower' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode1=='leader' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode1=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
else:
count+=1
else:
tdLog.exit("three mnodes is not ready in 10s ")
tdSql.query("show mnodes;")
tdSql.checkRows(self.mnodes)
tdSql.checkData(0,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort))
tdSql.checkData(0,self.mnodeStatusIndex,'ready')
tdSql.checkData(1,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort+self.portStep))
tdSql.checkData(1,self.mnodeStatusIndex,'ready')
tdSql.checkData(2,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort+self.portStep*2))
tdSql.checkData(2,self.mnodeStatusIndex,'ready')
def check3mnode1off(self):
count=0
while count < self.mnodeCheckCnt:
time.sleep(1)
tdSql.query("show mnodes")
tdLog.debug(tdSql.queryResult)
# if tdSql.checkRows(self.mnodes) :
# tdLog.debug("mnode is three nodes")
# else:
# tdLog.exit("mnode number is correct")
roleOfMnode0 = tdSql.queryResult[0][self.roleIndex]
roleOfMnode1 = tdSql.queryResult[1][self.roleIndex]
roleOfMnode2 = tdSql.queryResult[2][self.roleIndex]
if roleOfMnode0=='offline' :
if roleOfMnode1=='leader' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex]
break
elif roleOfMnode1=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
elif roleOfMnode1=='offline' :
if roleOfMnode0=='leader' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
elif roleOfMnode2=='offline' :
if roleOfMnode0=='leader' and roleOfMnode1 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode1 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex]
break
count+=1
else:
tdLog.exit("three mnodes is not ready in 10s ")
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}, {'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 100000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("async insert data")
pThread = tmqCom.asyncInsertData(paraDict)
tdLog.info("create topics from stb with filter")
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the notify info of start consume")
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("start switch mnode ................")
tdDnodes = cluster.dnodes
tdLog.info("1. stop dnode 0")
tdDnodes[0].stoptaosd()
time.sleep(10)
self.check3mnode1off()
tdLog.info("2. start dnode 0")
tdDnodes[0].starttaosd()
self.check3mnode()
tdLog.info("3. stop dnode 1")
tdDnodes[1].stoptaosd()
time.sleep(10)
self.check3mnode1off()
tdLog.info("switch end and wait insert data end ................")
pThread.join()
tdLog.info("check the consume result")
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdLog.printNoPrefix("======== Notes: must add '-N 5' for run the script ========")
self.checkDnodesStatusAndCreateMnode(self.dnodes)
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -204,6 +204,35 @@ class TMQCom:
tdLog.debug("insert data ............ [OK]")
return
def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j)
else:
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, -j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
......@@ -291,6 +320,17 @@ class TMQCom:
pThread.start()
return pThread
def threadFunctionForInsert(self, **paraDict):
# create new connector for new tdSql instance in my thread
newTdSql = tdCom.newTdSql()
self.insert_data_2(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
return
def asyncInsertData(self, paraDict):
pThread = threading.Thread(target=self.threadFunctionForInsert, kwargs=paraDict)
pThread.start()
return pThread
def close(self):
self.cursor.close()
......
......@@ -113,7 +113,7 @@ class TDTestCase:
tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts, c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
......
......@@ -112,7 +112,7 @@ python3 ./test.py -f 2-query/twa.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
......@@ -134,3 +134,4 @@ python3 ./test.py -f 7-tmq/schema.py
python3 ./test.py -f 7-tmq/stbFilter.py
python3 ./test.py -f 7-tmq/tmqCheckData.py
python3 ./test.py -f 7-tmq/tmqUdf.py
#python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5
......@@ -60,10 +60,11 @@ if __name__ == "__main__":
stop = 0
restart = False
dnodeNums = 1
mnodeNums = 0
updateCfgDict = {}
execCmd = ""
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums'])
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums'])
for key, value in opts:
if key in ['-h', '--help']:
tdLog.printNoPrefix(
......@@ -79,7 +80,8 @@ if __name__ == "__main__":
tdLog.printNoPrefix('-d update cfg dict, base64 json str')
tdLog.printNoPrefix('-k not kill valgrind processer')
tdLog.printNoPrefix('-e eval str to run')
tdLog.printNoPrefix('-N create dnodes numbers clusters')
tdLog.printNoPrefix('-N create dnodes numbers in clusters')
tdLog.printNoPrefix('-M create mnode numbers in clusters')
sys.exit(0)
......@@ -133,6 +135,9 @@ if __name__ == "__main__":
if key in ['-N', '--dnodeNums']:
dnodeNums = value
if key in ['-M', '--mnodeNums']:
mnodeNums = value
if not execCmd == "":
tdDnodes.init(deployPath)
print(execCmd)
......@@ -244,9 +249,8 @@ if __name__ == "__main__":
tdDnodes.start(1)
tdCases.logSql(logSql)
else :
print("start cluster and dnodes number")
dnodeslist = cluster.configure_cluster(dnodes_nums=dnodeNums,independent=True)
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums))
dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums,mnodeNums=mnodeNums)
tdDnodes = ClusterDnodes(dnodeslist)
tdDnodes.init(deployPath, masterIp)
tdDnodes.setTestCluster(testCluster)
......
......@@ -453,7 +453,17 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
#if 0
// get schema
//============================== stub =================================================//
for (int32_t i = 0; i < numOfFields; i++) {
taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
}
//============================== stub =================================================//
#endif
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
taos_print_row(buf, row, fields, numOfFields);
if (0 != g_stConfInfo.showRowFlag) {
......@@ -656,12 +666,13 @@ void* consumeThreadFunc(void* param) {
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (pInfo->taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
exit(-1);
return NULL;
}
build_consumer(pInfo);
build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
assert(0);
return NULL;
}
......@@ -669,7 +680,9 @@ void* consumeThreadFunc(void* param) {
int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
taosFprintfFile(g_fp, "tmq_subscribe()! reason: %s\n", tmq_err2str(err));
assert(0);
return NULL;
}
tmq_list_destroy(pInfo->topicList);
......@@ -688,14 +701,13 @@ void* consumeThreadFunc(void* param) {
err = tmq_unsubscribe(pInfo->tmq);
if (err != 0) {
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
/*pInfo->consumeMsgCnt = -1;*/
/*return NULL;*/
taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
}
err = tmq_consumer_close(pInfo->tmq);
if (err != 0) {
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
/*exit(-1);*/
taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
}
pInfo->tmq = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册