提交 feb5886e 编写于 作者: C cpwu

Merge branch '3.0' into cpwu/3.0

...@@ -169,6 +169,7 @@ typedef struct SReqResultInfo { ...@@ -169,6 +169,7 @@ typedef struct SReqResultInfo {
uint32_t numOfRows; uint32_t numOfRows;
uint64_t totalRows; uint64_t totalRows;
uint32_t current; uint32_t current;
bool localResultFetched;
bool completed; bool completed;
int32_t precision; int32_t precision;
bool convertUcs4; bool convertUcs4;
......
...@@ -1905,6 +1905,10 @@ int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, i ...@@ -1905,6 +1905,10 @@ int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, i
tbLen = len1; tbLen = len1;
} }
if (dbLen <= 0 || tbLen <= 0) {
return -1;
}
if (tNameSetDbName(&name, acctId, dbName, dbLen)) { if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
return -1; return -1;
} }
......
...@@ -852,23 +852,33 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ...@@ -852,23 +852,33 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
} }
// all data has returned to App already, no need to try again // all data has returned to App already, no need to try again
if ((pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) && pResultInfo->completed) { if (pResultInfo->completed && (pRequest->body.queryJob != 0)) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return; return;
} }
// it is a local executed query, no need to do async fetch // it is a local executed query, no need to do async fetch
if (pResultInfo->current < pResultInfo->numOfRows && pRequest->body.queryJob == 0) { if (pRequest->body.queryJob == 0) {
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); ASSERT(pResultInfo->completed && pResultInfo->numOfRows >= 0);
if (pResultInfo->localResultFetched) {
pResultInfo->numOfRows = 0;
pResultInfo->current = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
} else {
pResultInfo->localResultFetched = true;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
}
return; return;
} }
SSchedulerReq req = { SSchedulerReq req = {
.syncReq = false, .syncReq = false,
.fetchFp = fetchCallback, .fetchFp = fetchCallback,
.cbParam = pRequest, .cbParam = pRequest,
}; };
schedulerFetchRows(pRequest->body.queryJob, &req); schedulerFetchRows(pRequest->body.queryJob, &req);
} }
...@@ -880,10 +890,10 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ...@@ -880,10 +890,10 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// set the current block is all consumed // set the current block is all consumed
pResultInfo->current = pResultInfo->numOfRows;
pResultInfo->convertUcs4 = false; pResultInfo->convertUcs4 = false;
taos_fetch_rows_a(res, fp, param); // it is a local executed query, no need to do async fetch
taos_fetch_rows_a(pRequest, fp, param);
} }
const void *taos_get_raw_block(TAOS_RES *res) { const void *taos_get_raw_block(TAOS_RES *res) {
......
...@@ -114,7 +114,7 @@ int32_t tsMinSlidingTime = 10; ...@@ -114,7 +114,7 @@ int32_t tsMinSlidingTime = 10;
// the maxinum number of distict query result // the maxinum number of distict query result
int32_t tsMaxNumOfDistinctResults = 1000 * 10000; int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
// 1 us for interval time range, changed accordingly // 1 database precision unit for interval time range, changed accordingly
int32_t tsMinIntervalTime = 1; int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly // 20sec, the maximum value of stream computing delay, changed accordingly
......
...@@ -199,6 +199,7 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -199,6 +199,7 @@ int32_t mndInitSync(SMnode *pMnode) {
} }
// decrease election timer // decrease election timer
setPingTimerMS(pMgmt->sync, 5000);
setElectTimerMS(pMgmt->sync, 600); setElectTimerMS(pMgmt->sync, 600);
setHeartbeatTimerMS(pMgmt->sync, 300); setHeartbeatTimerMS(pMgmt->sync, 300);
......
...@@ -569,7 +569,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -569,7 +569,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
return -1; return -1;
} }
setPingTimerMS(pVnode->sync, 3000); setPingTimerMS(pVnode->sync, 5000);
setElectTimerMS(pVnode->sync, 500); setElectTimerMS(pVnode->sync, 500);
setHeartbeatTimerMS(pVnode->sync, 100); setHeartbeatTimerMS(pVnode->sync, 100);
return 0; return 0;
......
...@@ -538,7 +538,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -538,7 +538,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
return code; return code;
} }
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) { for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) { if (functionNeedToExecute(&pCtx[k])) {
// todo add a dummy funtion to avoid process check // todo add a dummy funtion to avoid process check
...@@ -2969,25 +2969,10 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -2969,25 +2969,10 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo); setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
code = doAggregateImpl(pOperator, 0, pSup->pCtx); code = doAggregateImpl(pOperator, pSup->pCtx);
if (code != 0) { if (code != 0) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
#if 0 // test for encode/decode result info
if(pOperator->fpSet.encodeResultRow){
char *result = NULL;
int32_t length = 0;
pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
SAggSupporter* pSup = &pAggInfo->aggSup;
taosHashClear(pSup->pResultRowHashTable);
pInfo->resultRowInfo.size = 0;
pOperator->fpSet.decodeResultRow(pOperator, result);
if(result){
taosMemoryFree(result);
}
}
#endif
} }
closeAllResultRows(&pAggInfo->binfo.resultRowInfo); closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
......
...@@ -2860,101 +2860,3 @@ _error: ...@@ -2860,101 +2860,3 @@ _error:
return NULL; return NULL;
} }
static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SLastrowScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t size = taosArrayGetSize(pInfo->pTableList);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
// check if it is a group by tbname
if (size == taosArrayGetSize(pInfo->pTableList)) {
blockDataCleanup(pInfo->pRes);
tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} else {
// todo fetch the result for each group
}
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
blockDataDestroy(pInfo->pRes);
tsdbLastrowReaderClose(pInfo->pLastrowReader);
taosMemoryFreeClear(param);
}
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList,
SExecTaskInfo* pTaskInfo) {
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->pTableList = pTableList;
pInfo->readHandle = *readHandle;
pInfo->pRes = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols,
COL_MATCH_FROM_COL_ID);
int32_t* pCols = taosMemoryMalloc(numOfCols * sizeof(int32_t));
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i);
pCols[i] = pColMatch->colId;
}
pInfo->pSlotIds = taosMemoryMalloc(numOfCols * sizeof(pInfo->pSlotIds[0]));
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i);
for (int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) {
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId &&
pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pInfo->pSlotIds[pColMatch->targetSlotId] = -1;
break;
}
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) {
pInfo->pSlotIds[pColMatch->targetSlotId] = j;
break;
}
}
}
tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_ALL, pTableList, pCols, numOfCols,
&pInfo->pLastrowReader);
taosMemoryFree(pCols);
pOperator->name = "LastrowScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
initResultSizeInfo(pOperator, 1024);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
pOperator->cost.openCost = 0;
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
return NULL;
}
...@@ -2770,7 +2770,6 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { ...@@ -2770,7 +2770,6 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
} }
} }
pInfo->hasResult = true; pInfo->hasResult = true;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
break; break;
} }
...@@ -2867,7 +2866,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2867,7 +2866,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
} }
pInfo->hasResult = true; pInfo->hasResult = true;
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
} }
break; break;
} }
...@@ -6010,6 +6008,15 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { ...@@ -6010,6 +6008,15 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
pInfo->hasResult = true; pInfo->hasResult = true;
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
} }
} }
......
...@@ -2481,7 +2481,6 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) { ...@@ -2481,7 +2481,6 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
int32_t code = tjsonToObject(pJson, jkSubplanId, jsonToSubplanId, &pNode->id); int32_t code = tjsonToObject(pJson, jkSubplanId, jsonToSubplanId, &pNode->id);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkSubplanType, pNode->subplanType, code); tjsonGetNumberValue(pJson, jkSubplanType, pNode->subplanType, code);
;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanMsgType, &pNode->msgType); code = tjsonGetIntValue(pJson, jkSubplanMsgType, &pNode->msgType);
......
...@@ -956,7 +956,8 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -956,7 +956,8 @@ void nodesDestroyNode(SNode* pNode) {
} }
case QUERY_NODE_PHYSICAL_SUBPLAN: { case QUERY_NODE_PHYSICAL_SUBPLAN: {
SSubplan* pSubplan = (SSubplan*)pNode; SSubplan* pSubplan = (SSubplan*)pNode;
nodesDestroyList(pSubplan->pChildren); // nodesDestroyList(pSubplan->pChildren);
nodesClearList(pSubplan->pChildren);
nodesDestroyNode((SNode*)pSubplan->pNode); nodesDestroyNode((SNode*)pSubplan->pNode);
nodesDestroyNode((SNode*)pSubplan->pDataSink); nodesDestroyNode((SNode*)pSubplan->pDataSink);
nodesDestroyNode((SNode*)pSubplan->pTagCond); nodesDestroyNode((SNode*)pSubplan->pTagCond);
...@@ -972,7 +973,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -972,7 +973,7 @@ void nodesDestroyNode(SNode* pNode) {
SNode* pElement = NULL; SNode* pElement = NULL;
FOREACH(pElement, pPlan->pSubplans) { FOREACH(pElement, pPlan->pSubplans) {
if (first) { if (first) {
first = false; // first = false;
nodesDestroyNode(pElement); nodesDestroyNode(pElement);
} else { } else {
nodesClearList(((SNodeListNode*)pElement)->pNodeList); nodesClearList(((SNodeListNode*)pElement)->pNodeList);
......
...@@ -556,6 +556,7 @@ signed_literal(A) ::= TIMESTAMP NK_STRING(B). ...@@ -556,6 +556,7 @@ signed_literal(A) ::= TIMESTAMP NK_STRING(B).
signed_literal(A) ::= duration_literal(B). { A = releaseRawExprNode(pCxt, B); } signed_literal(A) ::= duration_literal(B). { A = releaseRawExprNode(pCxt, B); }
signed_literal(A) ::= NULL(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_NULL, &B); } signed_literal(A) ::= NULL(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_NULL, &B); }
signed_literal(A) ::= literal_func(B). { A = releaseRawExprNode(pCxt, B); } signed_literal(A) ::= literal_func(B). { A = releaseRawExprNode(pCxt, B); }
signed_literal(A) ::= NK_QUESTION(B). { A = createPlaceholderValueNode(pCxt, &B); }
%type literal_list { SNodeList* } %type literal_list { SNodeList* }
%destructor literal_list { nodesDestroyList($$); } %destructor literal_list { nodesDestroyList($$); }
......
...@@ -133,7 +133,10 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con ...@@ -133,7 +133,10 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
assert(*p == TS_PATH_DELIMITER[0]); assert(*p == TS_PATH_DELIMITER[0]);
int32_t dbLen = p - pTableName->z; int32_t dbLen = p - pTableName->z;
char name[TSDB_DB_FNAME_LEN] = {0}; if (dbLen <= 0) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
}
char name[TSDB_DB_FNAME_LEN] = {0};
strncpy(name, pTableName->z, dbLen); strncpy(name, pTableName->z, dbLen);
dbLen = strdequote(name); dbLen = strdequote(name);
......
...@@ -2173,14 +2173,28 @@ static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char uni ...@@ -2173,14 +2173,28 @@ static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char uni
return -1; return -1;
} }
static const char* getPrecisionStr(uint8_t precision) {
switch (precision) {
case TSDB_TIME_PRECISION_MILLI:
return TSDB_TIME_PRECISION_MILLI_STR;
case TSDB_TIME_PRECISION_MICRO:
return TSDB_TIME_PRECISION_MICRO_STR;
case TSDB_TIME_PRECISION_NANO:
return TSDB_TIME_PRECISION_NANO_STR;
default:
break;
}
return "unknown";
}
static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) { static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) {
uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision; uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision;
SValueNode* pInter = (SValueNode*)pInterval->pInterval; SValueNode* pInter = (SValueNode*)pInterval->pInterval;
bool valInter = TIME_IS_VAR_DURATION(pInter->unit); bool valInter = TIME_IS_VAR_DURATION(pInter->unit);
if (pInter->datum.i <= 0 || if (pInter->datum.i <= 0 || (!valInter && pInter->datum.i < tsMinIntervalTime)) {
(!valInter && convertTimePrecision(pInter->datum.i, precision, TSDB_TIME_PRECISION_MICRO) < tsMinIntervalTime)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL, tsMinIntervalTime,
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL, tsMinIntervalTime); getPrecisionStr(precision));
} }
if (NULL != pInterval->pOffset) { if (NULL != pInterval->pOffset) {
...@@ -2754,6 +2768,11 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns ...@@ -2754,6 +2768,11 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns
} }
} }
if (NULL == pPrimaryKeyExpr) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM,
"Primary timestamp column can not be null");
}
return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery); return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery);
} }
...@@ -2998,8 +3017,7 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName ...@@ -2998,8 +3017,7 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
int32_t code = int32_t code =
checkRangeOption(pCxt, "buffer", pOptions->buffer, TSDB_MIN_BUFFER_PER_VNODE, TSDB_MAX_BUFFER_PER_VNODE); checkRangeOption(pCxt, "buffer", pOptions->buffer, TSDB_MIN_BUFFER_PER_VNODE, TSDB_MAX_BUFFER_PER_VNODE);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "cacheLast", pOptions->cacheLast, TSDB_MIN_DB_CACHE_LAST, code = checkRangeOption(pCxt, "cacheLast", pOptions->cacheLast, TSDB_MIN_DB_CACHE_LAST, TSDB_MAX_DB_CACHE_LAST);
TSDB_MAX_DB_CACHE_LAST);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "cacheLastSize", pOptions->cacheLastSize, TSDB_MIN_DB_CACHE_LAST_SIZE, code = checkRangeOption(pCxt, "cacheLastSize", pOptions->cacheLastSize, TSDB_MIN_DB_CACHE_LAST_SIZE,
......
...@@ -60,7 +60,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { ...@@ -60,7 +60,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_EXPRIE_STATEMENT: case TSDB_CODE_PAR_EXPRIE_STATEMENT:
return "This statement is no longer supported"; return "This statement is no longer supported";
case TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL: case TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL:
return "Interval cannot be less than %d us"; return "Interval cannot be less than %d %s";
case TSDB_CODE_PAR_DB_NOT_SPECIFIED: case TSDB_CODE_PAR_DB_NOT_SPECIFIED:
return "Database not specified"; return "Database not specified";
case TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME: case TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME:
......
此差异已折叠。
...@@ -30,7 +30,7 @@ extern "C" { ...@@ -30,7 +30,7 @@ extern "C" {
#define TIMER_MAX_MS 0x7FFFFFFF #define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000 #define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000 #define PING_TIMER_MS 5000
#define ELECT_TIMER_MS_MIN 1300 #define ELECT_TIMER_MS_MIN 1300
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
......
...@@ -273,16 +273,8 @@ int32_t syncLeaderTransfer(int64_t rid) { ...@@ -273,16 +273,8 @@ int32_t syncLeaderTransfer(int64_t rid) {
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
if (pSyncNode->peersNum == 0) { int32_t ret = syncNodeLeaderTransfer(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
int32_t ret = syncLeaderTransferTo(rid, newLeader);
return ret; return ret;
} }
...@@ -293,25 +285,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { ...@@ -293,25 +285,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
return -1; return -1;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
int32_t ret = 0;
if (pSyncNode->replicaNum == 1) {
sError("only one replica, cannot drop leader");
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_ONE_REPLICA;
return -1;
}
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
pMsg->newLeaderId.vgId = pSyncNode->vgId;
pMsg->newNodeInfo = newLeader;
ASSERT(pMsg != NULL);
SRpcMsg rpcMsg = {0};
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
syncLeaderTransferDestroy(pMsg);
ret = syncNodePropose(pSyncNode, &rpcMsg, false); int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret; return ret;
} }
...@@ -337,6 +312,12 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { ...@@ -337,6 +312,12 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
return -1; return -1;
} }
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
pMsg->newLeaderId.vgId = pSyncNode->vgId; pMsg->newLeaderId.vgId = pSyncNode->vgId;
...@@ -1118,19 +1099,13 @@ void syncNodeStart(SSyncNode* pSyncNode) { ...@@ -1118,19 +1099,13 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode); syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode);
} else {
return; syncNodeBecomeFollower(pSyncNode, "first start");
} }
syncNodeBecomeFollower(pSyncNode, "first start"); int32_t ret = 0;
ret = syncNodeStartPingTimer(pSyncNode);
// int32_t ret = 0; ASSERT(ret == 0);
// ret = syncNodeStartPingTimer(pSyncNode);
// ASSERT(ret == 0);
if (gRaftDetailLog) {
syncNodeLog2("==state change become leader immediately==", pSyncNode);
}
} }
void syncNodeStartStandBy(SSyncNode* pSyncNode) { void syncNodeStartStandBy(SSyncNode* pSyncNode) {
...@@ -1147,8 +1122,6 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ...@@ -1147,8 +1122,6 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "sync close"); syncNodeEventLog(pSyncNode, "sync close");
// leader transfer
int32_t ret; int32_t ret;
ASSERT(pSyncNode != NULL); ASSERT(pSyncNode != NULL);
...@@ -1183,14 +1156,6 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1183,14 +1156,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
pSyncNode->pNewNodeReceiver = NULL; pSyncNode->pNewNodeReceiver = NULL;
} }
/*
if (pSyncNode->pSnapshot != NULL) {
taosMemoryFree(pSyncNode->pSnapshot);
}
*/
// tsem_destroy(&pSyncNode->restoreSem);
// free memory in syncFreeNode // free memory in syncFreeNode
// taosMemoryFree(pSyncNode); // taosMemoryFree(pSyncNode);
} }
...@@ -1255,7 +1220,7 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { ...@@ -1255,7 +1220,7 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
} else { } else {
sError("sync env is stop, syncNodeStartPingTimer"); sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
} }
return ret; return ret;
} }
...@@ -1276,7 +1241,7 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { ...@@ -1276,7 +1241,7 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
&pSyncNode->pElectTimer); &pSyncNode->pElectTimer);
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
} else { } else {
sError("sync env is stop, syncNodeStartElectTimer"); sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
} }
return ret; return ret;
} }
...@@ -1316,7 +1281,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { ...@@ -1316,7 +1281,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
&pSyncNode->pHeartbeatTimer); &pSyncNode->pHeartbeatTimer);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} else { } else {
sError("sync env is stop, syncNodeStartHeartbeatTimer"); sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
} }
return ret; return ret;
} }
...@@ -2643,7 +2608,7 @@ const char* syncStr(ESyncState state) { ...@@ -2643,7 +2608,7 @@ const char* syncStr(ESyncState state) {
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
syncNodeEventLog(ths, "begin leader transfer"); syncNodeEventLog(ths, "do leader transfer");
bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId)); bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
......
...@@ -17,6 +17,11 @@ ...@@ -17,6 +17,11 @@
#include "syncElection.h" #include "syncElection.h"
#include "syncReplication.h" #include "syncReplication.h"
int32_t syncNodeTimerRoutine(SSyncNode* ths) {
syncNodeEventLog(ths, "timer routines ... ");
return 0;
}
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncTimeoutLog2("==syncNodeOnTimeoutCb==", pMsg); syncTimeoutLog2("==syncNodeOnTimeoutCb==", pMsg);
...@@ -24,8 +29,11 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { ...@@ -24,8 +29,11 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->pingTimerCounter); ++(ths->pingTimerCounter);
// syncNodePingAll(ths); // syncNodePingAll(ths);
syncNodePingPeers(ths); // syncNodePingPeers(ths);
syncNodeTimerRoutine(ths);
} }
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
...@@ -40,7 +48,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { ...@@ -40,7 +48,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
syncNodeReplicate(ths); syncNodeReplicate(ths);
} }
} else { } else {
sTrace("unknown timeoutType:%d", pMsg->timeoutType); sError("vgId:%d, unknown timeout-type:%d", ths->vgId, pMsg->timeoutType);
} }
return ret; return ret;
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
if $data[0][0] != 1 then
return -1
endi
if $data[0][4] != ready then
goto check_dnode_ready
endi
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$loop_cnt = 0
check_dnode_ready_1:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnodes not ready!
return -1
endi
sql show dnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
if $data[0][4] != ready then
goto check_dnode_ready_1
endi
if $data[1][4] != ready then
goto check_dnode_ready_1
endi
if $data[2][4] != ready then
goto check_dnode_ready_1
endi
if $data[3][4] != ready then
goto check_dnode_ready_1
endi
$replica = 3
$vgroups = 1
print ============= create database
sql create database db replica $replica vgroups $vgroups
$loop_cnt = 0
check_db_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 100 then
print ====> db not ready!
return -1
endi
sql show databases
print ===> rows: $rows
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19]
if $rows != 3 then
return -1
endi
if $data[2][19] != ready then
goto check_db_ready
endi
sql use db
$loop_cnt = 0
check_vg_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 300 then
print ====> vgroups not ready!
return -1
endi
sql show vgroups
print ===> rows: $rows
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
if $rows != $vgroups then
return -1
endi
if $data[0][4] == leader then
if $data[0][6] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi
endi
elif $data[0][6] == leader then
if $data[0][4] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi
endi
elif $data[0][8] == leader then
if $data[0][4] == follower then
if $data[0][6] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi
endi
else
goto check_vg_ready
endi
vg_ready:
print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
sql create table ct1 using stb tags(1000)
print ===> write 100 records
$N = 100
$count = 0
while $count < $N
$ms = 1591200000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
#sql flush database db;
sleep 3000
print ===> stop dnode1 dnode2 dnode3 dnode4
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
print ===> start dnode1 dnode2 dnode3 dnode4
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql connect
sql use db
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
#system sh/exec.sh -n dnode2 -s stop -x SIGINT
#system sh/exec.sh -n dnode3 -s stop -x SIGINT
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
#########################################################
...@@ -16,9 +16,9 @@ from tmqCommon import * ...@@ -16,9 +16,9 @@ from tmqCommon import *
class TDTestCase: class TDTestCase:
def __init__(self): def __init__(self):
self.vgroups = 2 self.vgroups = 4
self.ctbNum = 100 self.ctbNum = 1000
self.rowsPerTbl = 10000 self.rowsPerTbl = 1000
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
...@@ -29,7 +29,7 @@ class TDTestCase: ...@@ -29,7 +29,7 @@ class TDTestCase:
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
'dropFlag': 1, 'dropFlag': 1,
'event': '', 'event': '',
'vgroups': 3, 'vgroups': 4,
'stbName': 'stb', 'stbName': 'stb',
'colPrefix': 'c', 'colPrefix': 'c',
'tagPrefix': 't', 'tagPrefix': 't',
...@@ -37,14 +37,14 @@ class TDTestCase: ...@@ -37,14 +37,14 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb', 'ctbPrefix': 'ctb',
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 500, 'ctbNum': 1000,
'rowsPerTbl': 1000, 'rowsPerTbl': 1000,
'batchNum': 500, 'batchNum': 400,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3, 'pollDelay': 3,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 1}
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
...@@ -54,20 +54,21 @@ class TDTestCase: ...@@ -54,20 +54,21 @@ class TDTestCase:
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb") # tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], # tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) # ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data") # tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctbx",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("restart taosd to ensure that the data falls into the disk") # tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1) # tdSql.query("flush database %s"%(paraDict['dbName']))
# tdDnodes.start(1)
tdSql.query("flush database %s"%(paraDict['dbName']))
return return
# 自动建表完成数据插入,启动消费
def tmqCase1(self): def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ") tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
...@@ -90,28 +91,23 @@ class TDTestCase: ...@@ -90,28 +91,23 @@ class TDTestCase:
'showRow': 1, 'showRow': 1,
'snapshot': 1} 'snapshot': 1}
# paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
# paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
# paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() # tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") # tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) # tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb") # tdLog.info("insert data by auto create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], # tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
consumerId = 0 consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
...@@ -120,19 +116,12 @@ class TDTestCase: ...@@ -120,19 +116,12 @@ class TDTestCase:
ifManualCommit = 0 ifManualCommit = 0
keyList = 'group.id:cgrp1,\ keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\ enable.auto.commit:true,\
auto.commit.interval.ms:500,\ auto.commit.interval.ms:1000,\
auto.offset.reset:earliest' auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# time.sleep(3)
tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("================= restart dnode ===========================")
tdDnodes.stop(1)
tdDnodes.start(1)
time.sleep(5)
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
expectRows = 1 expectRows = 1
...@@ -172,23 +161,23 @@ class TDTestCase: ...@@ -172,23 +161,23 @@ class TDTestCase:
'pollDelay': 5, 'pollDelay': 5,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 1} 'snapshot': 0}
# paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
# paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
# paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") # tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) # tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb") # tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], # tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) # ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data") # tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
...@@ -211,14 +200,8 @@ class TDTestCase: ...@@ -211,14 +200,8 @@ class TDTestCase:
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("create some new child table and insert data ") tdLog.info("create some new child table and insert data ")
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctby",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("================= restart dnode ===========================")
tdDnodes.stop(1)
tdDnodes.start(1)
time.sleep(5)
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
...@@ -237,91 +220,13 @@ class TDTestCase: ...@@ -237,91 +220,13 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 2 end ...... ") tdLog.printNoPrefix("======== test case 2 end ...... ")
# 自动建表完成数据插入,启动消费
def tmqCase3(self):
tdLog.printNoPrefix("======== test case 3: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 400,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("insert data by auto create ctb")
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# tdLog.info("================= restart dnode ===========================")
# tdDnodes.stop(1)
# tdDnodes.start(1)
# time.sleep(2)
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
if totalConsumeRows != totalRowsInserted:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 3 end ...... ")
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
self.prepareTestEnv()
# self.tmqCase1() self.tmqCase1()
# self.tmqCase2() # self.tmqCase2() TD-17267
self.tmqCase3()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
import taos
import sys
import time
import socket
import os
import threading
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 2
self.ctbNum = 100
self.rowsPerTbl = 10000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 3,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 500,
'rowsPerTbl': 1000,
'batchNum': 500,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 400,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
# paraDict['vgroups'] = self.vgroups
# paraDict['ctbNum'] = self.ctbNum
# paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:500,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# time.sleep(3)
tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("================= restart dnode ===========================")
tdDnodes.stop(1)
tdDnodes.start(1)
time.sleep(5)
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
if totalConsumeRows != totalRowsInserted:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
# paraDict['vgroups'] = self.vgroups
# paraDict['ctbNum'] = self.ctbNum
# paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("create some new child table and insert data ")
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("================= restart dnode ===========================")
tdDnodes.stop(1)
tdDnodes.start(1)
time.sleep(5)
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
if totalConsumeRows != totalRowsInserted:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
...@@ -176,8 +176,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py ...@@ -176,8 +176,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
#------------querPolicy 2----------- #------------querPolicy 2-----------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册