提交 d4e0a27e 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode

......@@ -34,16 +34,6 @@ def abort_previous(){
}
def pre_test(){
sh 'hostname'
sh '''
date
sudo rmtaos || echo "taosd has not installed"
'''
sh '''
killall -9 taosd ||echo "no taosd running"
killall -9 gdb || echo "no gdb running"
killall -9 python3.8 || echo "no python program running"
cd ${WKC}
'''
script {
if (env.CHANGE_TARGET == 'master') {
sh '''
......@@ -81,10 +71,10 @@ def pre_test(){
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git log|head -n20
git log -5
cd ${WK}
git pull >/dev/null
git log|head -n20
git log -5
'''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
sh '''
......@@ -92,10 +82,10 @@ def pre_test(){
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git log|head -n20
git log -5
cd ${WKC}
git pull >/dev/null
git log|head -n20
git log -5
'''
} else {
sh '''
......@@ -106,21 +96,10 @@ def pre_test(){
cd ${WKC}
git submodule update --init --recursive
'''
sh '''
cd ${WK}
export TZ=Asia/Harbin
date
rm -rf debug
mkdir debug
cd debug
cmake .. > /dev/null
make -j4> /dev/null
'''
sh '''
cd ${WKPY}
git reset --hard
git pull
pip3 install .
'''
return 1
}
......@@ -131,12 +110,14 @@ def pre_test_win(){
time /t
taskkill /f /t /im python.exe
taskkill /f /t /im bash.exe
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine\\debug
rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\debug
exit 0
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git reset --hard
git fetch || git fetch
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git reset --hard
git fetch || git fetch
git checkout -f
......@@ -144,39 +125,73 @@ def pre_test_win(){
script {
if (env.CHANGE_TARGET == 'master') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout master
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout master
'''
} else if(env.CHANGE_TARGET == '2.0') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout 2.0
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout 2.0
'''
} else if(env.CHANGE_TARGET == '3.0') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout 3.0
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout 3.0
'''
} else {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout develop
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout develop
'''
}
}
script {
if (env.CHANGE_URL =~ /\/TDengine\//) {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git pull
git log -5
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git pull
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git log -5
'''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git pull
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git log -5
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git pull
git log -5
'''
} else {
sh '''
echo "unmatched reposiotry ${CHANGE_URL}"
'''
}
}
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git branch
git pull || git pull
git fetch origin +refs/pull/%CHANGE_ID%/merge
git checkout -qf FETCH_HEAD
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git submodule update --init --recursive
'''
}
def pre_test_build_win() {
bat '''
echo "building ..."
time /t
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
mkdir debug
cd debug
call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64
......@@ -192,6 +207,7 @@ pipeline {
agent none
options { skipDefaultCheckout() }
environment{
WKDIR = '/var/lib/jenkins/workspace'
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC = '/var/lib/jenkins/workspace/TDinternal/community'
WKPY = '/var/lib/jenkins/workspace/taos-connector-python'
......@@ -206,39 +222,22 @@ pipeline {
changeRequest()
}
steps {
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 20, unit: 'MINUTES'){
pre_test()
script {
if (env.CHANGE_URL =~ /\/TDengine\//) {
sh '''
cd ${WK}/debug
ctest -VV
'''
sh '''
export LD_LIBRARY_PATH=${WK}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
sh '''
cd ${WKC}/debug
ctest -VV
'''
sh '''
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
} else {
sh '''
echo "unmatched reposiotry ${CHANGE_URL}"
'''
}
sh '''
cd ${WKC}/tests/parallel_test
date
time ./container_build.sh -w ${WKDIR} -t 8 -e
rm -f /tmp/cases.task
./collect_cases.sh -e
'''
sh '''
cd ${WKC}/tests/parallel_test
date
time ./run.sh -e -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log
'''
}
sh '''
cd ${WKC}/tests
./test-all.sh b1fq
'''
}
}
}
......
......@@ -173,6 +173,7 @@ typedef struct SqlFunctionCtx {
SInputColumnInfoData input;
SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc
uint8_t scanFlag; // record current running step, default: 0
////////////////////////////////////////////////////////////////
int32_t startRow; // start row index
int32_t size; // handled processed row number
......@@ -183,7 +184,6 @@ typedef struct SqlFunctionCtx {
bool hasNull; // null value exist in current block, TODO remove it
bool requireNull; // require null in some function, TODO remove it
int32_t columnIndex; // TODO remove it
uint8_t currentStage; // record current running step, default: 0
bool isAggSet;
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
bool stableQuery;
......
......@@ -706,7 +706,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock,
SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo,
SNode* pConditions, SOperatorInfo* pOperatorDumy, SInterval* pInterval);
SNode* pConditions, SOperatorInfo* pOperatorDumy);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
......
......@@ -746,7 +746,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
pCtx[i].pSrcBlock = pBlock;
pCtx[i].currentStage = scanFlag;
pCtx[i].scanFlag = scanFlag;
SInputColumnInfoData* pInput = &pCtx[i].input;
pInput->uid = pBlock->info.uid;
......@@ -826,23 +826,22 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
return code;
}
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].startTs = startTs;
// this can be set during create the struct
// todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) {
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s call aggregate function error happens, code : %s",
GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
pOperator->pTaskInfo->code = code;
longjmp(pOperator->pTaskInfo->env, code);
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
return code;
}
}
}
}
return TSDB_CODE_SUCCESS;
}
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
......@@ -998,18 +997,22 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return false;
}
if (isRowEntryCompleted(pResInfo)) {
return false;
if (pCtx->scanFlag == REPEAT_SCAN) {
return fmIsRepeatScanFunc(pCtx->functionId);
}
if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
// return QUERY_IS_ASC_QUERY(pQueryAttr);
if (isRowEntryCompleted(pResInfo)) {
return false;
}
// denote the order type
if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
// return pCtx->param[0].i == pQueryAttr->order.order;
}
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
// }
//
// // denote the order type
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
// // return pCtx->param[0].i == pQueryAttr->order.order;
// }
// in the reverse table scan, only the following functions need to be executed
// if (IS_REVERSE_SCAN(pRuntimeEnv) ||
......@@ -1944,7 +1947,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
cleanupResultRowEntry(pEntry);
pCtx[i].resultInfo = pEntry;
pCtx[i].currentStage = stage;
pCtx[i].scanFlag = stage;
// set the timestamp output buffer for top/bottom/diff query
// int32_t fid = pCtx[i].functionId;
......@@ -3724,7 +3727,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t order = TSDB_ORDER_ASC;
......@@ -3738,9 +3740,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
if (pBlock == NULL) {
break;
}
// if (pAggInfo->current != NULL) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// }
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
......@@ -3750,17 +3749,19 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->pScalarExprInfo != NULL) {
code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
pAggInfo->numOfScalarExpr, NULL);
pAggInfo->numOfScalarExpr, NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
longjmp(pTaskInfo->env, pTaskInfo->code);
longjmp(pTaskInfo->env, code);
}
}
// the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true);
doAggregateImpl(pOperator, 0, pInfo->pCtx);
code = doAggregateImpl(pOperator, 0, pInfo->pCtx);
if (code != 0) {
longjmp(pTaskInfo->env, code);
}
#if 0 // test for encode/decode result info
if(pOperator->encodeResultRow){
......@@ -4807,17 +4808,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SSDataBlock* pResBlockDumy = createResDataBlock(pDescNode);
SQueryTableDataCond cond = {0};
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
SInterval interval = extractIntervalInfo(pTableScanNode);
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
......@@ -4825,7 +4815,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pResBlock, pCols, tableIdList, pTaskInfo,
pScanPhyNode->node.pConditions, pOperatorDumy, &interval);
pScanPhyNode->node.pConditions, pOperatorDumy);
taosArrayDestroy(tableIdList);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
......
......@@ -260,6 +260,53 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
}
static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
// currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr == 0) {
return;
}
SMetaReader mr = {0};
metaReaderInit(&mr, pTableScanInfo->readHandle.meta, 0);
metaGetTableEntryByUid(&mr, pBlock->info.uid);
for (int32_t j = 0; j < pTableScanInfo->numOfPseudoExpr; ++j) {
SExprInfo* pExpr = &pTableScanInfo->pPseudoExpr[j];
int32_t dstSlotId = pExpr->base.resSchema.slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
int32_t functionId = pExpr->pExpr->_function.functionId;
// this is to handle the tbname
if (fmIsScanPseudoColumnFunc(functionId)) {
struct SScalarFuncExecFuncs fpSet = {0};
fmGetScalarFuncExecFuncs(functionId, &fpSet);
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
infoData.info.bytes = sizeof(uint64_t);
colInfoDataEnsureCapacity(&infoData, 0, 1);
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
SScalarParam srcParam = {
.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
} else { // these are tags
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, p, (p == NULL));
}
}
}
metaReaderClear(&mr);
}
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
......@@ -285,23 +332,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
// currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) {
int32_t dstSlotId = pTableScanInfo->pPseudoExpr->base.resSchema.slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
struct SScalarFuncExecFuncs fpSet;
fmGetScalarFuncExecFuncs(pTableScanInfo->pPseudoExpr->pExpr->_function.functionId, &fpSet);
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
infoData.info.bytes = sizeof(uint64_t);
colInfoDataEnsureCapacity(&infoData, 0, 1);
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
addTagPseudoColumnData(pTableScanInfo, pBlock);
}
return pBlock;
......@@ -751,8 +782,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader,
SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList,
SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy,
SInterval* pInterval) {
SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy ) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -760,6 +790,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
goto _error;
}
STableScanInfo* pSTInfo = (STableScanInfo*)pOperatorDumy->info;
int32_t numOfOutput = taosArrayGetSize(pColList);
SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
......@@ -792,7 +824,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
}
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInitP(pInterval, 10000); // TODO(liuyao) get watermark from physical plan
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan
if (pInfo->pUpdateInfo == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
......@@ -805,7 +837,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pOperatorDumy;
pInfo->interval = *pInterval;
pInfo->interval = pSTInfo->interval;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......
......@@ -1645,7 +1645,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
int32_t type = pCol->info.type;
SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1;
// all data are null, set it completed
......
......@@ -37,7 +37,7 @@
#define GET_TRUE_DATA_TYPE() \
int32_t type = 0; \
if (pCtx->currentStage == MERGE_STAGE) { \
if (pCtx->scanFlag == MERGE_STAGE) { \
type = pCtx->resDataInfo.type; \
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); \
} else { \
......@@ -908,7 +908,7 @@ static void avg_func_merge(SqlFunctionCtx *pCtx) {
static void avg_finalizer(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
if (pCtx->currentStage == MERGE_STAGE) {
if (pCtx->scanFlag == MERGE_STAGE) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) {
......@@ -1152,7 +1152,7 @@ static void stddev_function(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) {
if (pCtx->scanFlag == REPEAT_SCAN && pStd->stage == 0) {
pStd->stage++;
avg_finalizer(pCtx);
......@@ -1814,7 +1814,7 @@ static STopBotInfo *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) {
return (STopBotInfo*) pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
......@@ -1956,7 +1956,7 @@ static void top_func_merge(SqlFunctionCtx *pCtx) {
for (int32_t i = 0; i < pInput->num; ++i) {
int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->resDataInfo.type;
// do_top_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp,
// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag);
}
SET_VAL(pCtx, pInput->num, pOutput->num);
......@@ -2013,7 +2013,7 @@ static void bottom_func_merge(SqlFunctionCtx *pCtx) {
for (int32_t i = 0; i < pInput->num; ++i) {
int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->resDataInfo.type;
// do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type,
// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag);
}
SET_VAL(pCtx, pInput->num, pOutput->num);
......@@ -2073,7 +2073,7 @@ static void percentile_function(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1;
// all data are null, set it completed
......@@ -2180,7 +2180,7 @@ static SAPercentileInfo *getAPerctInfo(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = NULL;
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) {
pInfo = (SAPercentileInfo*) pCtx->pOutput;
} else {
pInfo = GET_ROWCELL_INTERBUF(pResInfo);
......@@ -2270,7 +2270,7 @@ static void apercentile_finalizer(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo * pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == MERGE_STAGE) {
if (pCtx->scanFlag == MERGE_STAGE) {
// if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null
// assert(pOutput->pHisto->numOfElems > 0);
//
......@@ -2510,7 +2510,7 @@ static void copy_function(SqlFunctionCtx *pCtx);
static void tag_function(SqlFunctionCtx *pCtx) {
SET_VAL(pCtx, 1, 1);
if (pCtx->currentStage == MERGE_STAGE) {
if (pCtx->scanFlag == MERGE_STAGE) {
copy_function(pCtx);
} else {
taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->resDataInfo.type, true);
......@@ -2966,7 +2966,7 @@ static bool spread_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRe
SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
// this is the server-side setup function in client-side, the secondary merge do not need this procedure
if (pCtx->currentStage == MERGE_STAGE) {
if (pCtx->scanFlag == MERGE_STAGE) {
// pCtx->param[0].param.d = DBL_MAX;
// pCtx->param[3].param.d = -DBL_MAX;
} else {
......@@ -3086,7 +3086,7 @@ void spread_function_finalizer(SqlFunctionCtx *pCtx) {
*/
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
if (pCtx->currentStage == MERGE_STAGE) {
if (pCtx->scanFlag == MERGE_STAGE) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
// if (pResInfo->hasResult != DATA_SET_FLAG) {
......
......@@ -228,10 +228,12 @@ static void setScanWindowInfo(SScanLogicNode* pScan) {
static int32_t osdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
SOsdInfo info = {0};
int32_t code = osdMatch(pCxt, pLogicNode, &info);
if (TSDB_CODE_SUCCESS == code && info.pScan) {
setScanWindowInfo((SScanLogicNode*)info.pScan);
}
if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) {
info.pScan->dataRequired = osdGetDataRequired(info.pSdrFuncs);
info.pScan->pDynamicScanFuncs = info.pDsoFuncs;
setScanWindowInfo((SScanLogicNode*)info.pScan);
OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_OSD);
pCxt->optimized = true;
}
......
......@@ -154,7 +154,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
blockDebugShowData(pRes);
// blockDebugShowData(pRes);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
} else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
......
......@@ -13,7 +13,6 @@
*/
#ifdef USE_UV
#include "transComm.h"
typedef struct SCliConn {
......@@ -21,15 +20,16 @@ typedef struct SCliConn {
uv_connect_t connReq;
uv_stream_t* stream;
uv_write_t writeReq;
void* hostThrd;
SConnBuffer readBuf;
void* data;
STransQueue cliMsgs;
queue conn;
uint64_t expireTime;
int hThrdIdx;
STransCtx ctx;
void* hostThrd;
int hThrdIdx;
SConnBuffer readBuf;
STransQueue cliMsgs;
queue conn;
uint64_t expireTime;
STransCtx ctx;
bool broken; // link broken or not
ConnStatus status; //
......@@ -157,13 +157,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \
while (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \
} \
if (T_REF_VAL_GET(conn) == 1) { \
if (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \
} \
destroyCmsg(pMsg); \
addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \
return; \
} \
} while (0)
......@@ -707,7 +705,8 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st;
tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el);
tTrace("%s cli msg tran time cost: %" PRIu64 "us, threadID: %" PRId64 "", ((STrans*)pThrd->pTransInst)->label, el,
pThrd->thread);
STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst;
......@@ -1030,8 +1029,8 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet),
EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle);
tDebug("send request at thread:%d, threadID: %" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle);
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
}
......@@ -1058,8 +1057,8 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
cliMsg->type = Normal;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet),
EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle);
tDebug("send request at thread:%d, threadID:%" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle);
transSendAsync(thrd->asyncPool, &(cliMsg->q));
tsem_t* pSem = pCtx->pSem;
......
......@@ -35,7 +35,6 @@ typedef struct SSrvConn {
uv_timer_t pTimer;
queue queue;
int persist; // persist connection or not
SConnBuffer readBuf; // read buf,
int inType;
void* pTransInst; // rpc init
......@@ -138,6 +137,7 @@ static void destroySmsg(SSrvMsg* smsg);
// check whether already read complete packet
static SSrvConn* createConn(void* hThrd);
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
static int reallocConnRefHandle(SSrvConn* conn);
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
......@@ -164,7 +164,7 @@ static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
// add handle loop
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName);
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName);
static bool addHandleToAcceptloop(void* arg);
#define CONN_SHOULD_RELEASE(conn, head) \
......@@ -180,6 +180,7 @@ static bool addHandleToAcceptloop(void* arg);
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
reallocConnRefHandle(conn); \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
......@@ -360,10 +361,14 @@ void uvOnSendCb(uv_write_t* req, int status) {
tTrace("server conn %p data already was written on stream", conn);
if (!transQueueEmpty(&conn->srvMsgs)) {
SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
if (msg->type == Release && conn->status != ConnNormal) {
conn->status = ConnNormal;
transUnrefSrvHandle(conn);
}
// if (msg->type == Release && conn->status != ConnNormal) {
// conn->status = ConnNormal;
// transUnrefSrvHandle(conn);
// reallocConnRefHandle(conn);
// destroySmsg(msg);
// transQueueClear(&conn->srvMsgs);
// return;
//}
destroySmsg(msg);
// send second data, just use for push
if (!transQueueEmpty(&conn->srvMsgs)) {
......@@ -421,8 +426,15 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
if (pConn->status == ConnNormal) {
pHead->msgType = pConn->inType + 1;
} else {
pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType;
if (smsg->type == Release) {
pHead->msgType = 0;
pConn->status = ConnNormal;
transUnrefSrvHandle(pConn);
} else {
pHead->msgType = pMsg->msgType;
}
}
pHead->release = smsg->type == Release ? 1 : 0;
pHead->code = htonl(pMsg->code);
......@@ -517,7 +529,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
int64_t refId = transMsg.refId;
SExHandle* exh2 = uvAcquireExHandle(refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("server handle %p except msg, ignore it", exh1);
tTrace("server handle except msg %p, ignore it", exh1);
uvReleaseExHandle(refId);
destroySmsg(msg);
continue;
......@@ -581,11 +593,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
if (pObj->numOfWorkerReady < pObj->numOfThreads) {
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady);
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
pObj->numOfWorkerReady);
uv_close((uv_handle_t*)cli, NULL);
return;
}
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
wr->data = cli;
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
......@@ -681,14 +694,14 @@ void* transAcceptThread(void* arg) {
return NULL;
}
void uvOnPipeConnectionCb(uv_connect_t *connect, int status) {
void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
if (status != 0) {
return;
}
SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
}
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) {
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) {
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
if (0 != uv_loop_init(pThrd->loop)) {
return false;
......@@ -787,6 +800,19 @@ static void destroyConn(SSrvConn* conn, bool clear) {
// uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
}
}
static int reallocConnRefHandle(SSrvConn* conn) {
uvReleaseExHandle(conn->refId);
uvRemoveExHandle(conn->refId);
// avoid app continue to send msg on invalid handle
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = conn->hostThrd;
exh->refId = uvAddExHandle(exh);
uvAcquireExHandle(exh->refId);
conn->refId = exh->refId;
return 0;
}
static void uvDestroyConn(uv_handle_t* handle) {
SSrvConn* conn = handle->data;
if (conn == NULL) {
......@@ -822,7 +848,7 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) {
ASSERT(status == 0);
SServerObj* srv = container_of(handle, SServerObj, pipeListen);
uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));
......@@ -859,7 +885,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId());
#else
char pipeName[PATH_MAX] = {0};
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId());
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(),
taosGetSelfPthreadId());
#endif
assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));
......@@ -874,7 +901,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
thrd->pipe = &(srv->pipe[i][1]); // init read
if (false == addHandleToWorkloop(thrd,pipeName)) {
if (false == addHandleToWorkloop(thrd, pipeName)) {
goto End;
}
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
......@@ -959,6 +986,7 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
SSrvConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
reallocConnRefHandle(conn);
if (!transQueuePush(&conn->srvMsgs, msg)) {
return;
}
......
#!/bin/bash
case_file=/tmp/cases.task
function usage() {
echo "$0"
echo -e "\t -o output case file"
echo -e "\t -e enterprise edition"
echo -e "\t -h help"
}
ent=0
while getopts "o:eh" opt; do
case $opt in
o)
case_file=$OPTARG
;;
e)
ent=1
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
script_dir=`dirname $0`
cd $script_dir
if [ $ent -eq 0 ]; then
echo ",,unit-test,bash test.sh" >$case_file
else
echo ",,unit-test,bash test.sh -e" >$case_file
fi
cat ../script/jenkins/basic.txt |grep -v "^#"|grep -v "^$"|sed "s/^/,,script,/" >>$case_file
grep "^python" ../system-test/fulltest.sh |sed "s/^/,,system-test,/" >>$case_file
exit 0
#!/bin/bash
function usage() {
echo "$0"
echo -e "\t -w work dir"
echo -e "\t -e enterprise edition"
echo -e "\t -t make thread count"
echo -e "\t -h help"
}
ent=0
while getopts "w:t:eh" opt; do
case $opt in
w)
WORKDIR=$OPTARG
;;
e)
ent=1
;;
t)
THREAD_COUNT=$OPTARG
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
if [ -z "$WORKDIR" ]; then
usage
exit 1
fi
if [ -z "$THREAD_COUNT" ]; then
THREAD_COUNT=1
fi
ulimit -c unlimited
if [ $ent -eq 0 ]; then
REP_DIR=/home/TDengine
REP_MOUNT_PARAM=$WORKDIR/TDengine:/home/TDengine
else
REP_DIR=/home/TDinternal
REP_MOUNT_PARAM=$WORKDIR/TDinternal:/home/TDinternal
fi
docker run \
-v $REP_MOUNT_PARAM \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_TOOLS=true;make -j $THREAD_COUNT"
ret=$?
exit $ret
#!/bin/bash
function usage() {
echo "$0"
echo -e "\t -m vm config file"
echo -e "\t -t task file"
echo -e "\t -b branch"
echo -e "\t -l log dir"
echo -e "\t -e enterprise edition"
echo -e "\t -o default timeout value"
echo -e "\t -h help"
}
ent=0
while getopts "m:t:b:l:o:eh" opt; do
case $opt in
m)
config_file=$OPTARG
;;
t)
t_file=$OPTARG
;;
b)
branch=$OPTARG
;;
l)
log_dir=$OPTARG
;;
e)
ent=1
;;
o)
timeout_param="-o $OPTARG"
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
#config_file=$1
if [ -z $config_file ]; then
usage
exit 1
fi
if [ ! -f $config_file ]; then
echo "$config_file not found"
usage
exit 1
fi
#t_file=$2
if [ -z $t_file ]; then
usage
exit 1
fi
if [ ! -f $t_file ]; then
echo "$t_file not found"
usage
exit 1
fi
date_tag=`date +%Y%m%d-%H%M%S`
if [ -z $log_dir ]; then
log_dir="log/${branch}_${date_tag}"
else
log_dir="$log_dir/${branch}_${date_tag}"
fi
hosts=()
usernames=()
passwords=()
workdirs=()
threads=()
i=0
while [ 1 ]; do
host=`jq .[$i].host $config_file`
if [ "$host" = "null" ]; then
break
fi
username=`jq .[$i].username $config_file`
if [ "$username" = "null" ]; then
break
fi
password=`jq .[$i].password $config_file`
if [ "$password" = "null" ]; then
password=""
fi
workdir=`jq .[$i].workdir $config_file`
if [ "$workdir" = "null" ]; then
break
fi
thread=`jq .[$i].thread $config_file`
if [ "$thread" = "null" ]; then
break
fi
hosts[i]=`echo $host|sed 's/\"$//'|sed 's/^\"//'`
usernames[i]=`echo $username|sed 's/\"$//'|sed 's/^\"//'`
passwords[i]=`echo $password|sed 's/\"$//'|sed 's/^\"//'`
workdirs[i]=`echo $workdir|sed 's/\"$//'|sed 's/^\"//'`
threads[i]=$thread
i=$(( i + 1 ))
done
function prepare_cases() {
cat $t_file >>$task_file
local i=0
while [ $i -lt $1 ]; do
echo "%%FINISHED%%" >>$task_file
i=$(( i + 1 ))
done
}
function clean_tmp() {
# clean tmp dir
local index=$1
local ssh_script="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}"
if [ -z ${passwords[index]} ]; then
ssh_script="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}"
fi
local cmd="${ssh_script} rm -rf ${workdirs[index]}/tmp"
${cmd}
}
function run_thread() {
local index=$1
local thread_no=$2
local runcase_script="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}"
if [ -z ${passwords[index]} ]; then
runcase_script="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}"
fi
local count=0
local script="${workdirs[index]}/TDengine/tests/parallel_test/run_container.sh"
if [ $ent -ne 0 ]; then
local script="${workdirs[index]}/TDinternal/community/tests/parallel_test/run_container.sh -e"
fi
local cmd="${runcase_script} ${script}"
# script="echo"
while [ 1 ]; do
local line=`flock -x $lock_file -c "head -n1 $task_file;sed -i \"1d\" $task_file"`
if [ "x$line" = "x%%FINISHED%%" ]; then
# echo "$index . $thread_no EXIT"
break
fi
if [ -z "$line" ]; then
continue
fi
echo "$line"|grep -q "^#"
if [ $? -eq 0 ]; then
continue
fi
local case_redo_time=`echo "$line"|cut -d, -f2`
if [ -z "$case_redo_time" ]; then
case_redo_time=${DEFAULT_RETRY_TIME:-2}
fi
local exec_dir=`echo "$line"|cut -d, -f3`
local case_cmd=`echo "$line"|cut -d, -f4`
local case_file=""
echo "$case_cmd"|grep -q "\.sh"
if [ $? -eq 0 ]; then
case_file=`echo "$case_cmd"|grep -o ".*\.sh"|awk '{print $NF}'`
fi
echo "$case_cmd"|grep -q "^python3"
if [ $? -eq 0 ]; then
case_file=`echo "$case_cmd"|grep -o ".*\.py"|awk '{print $NF}'`
fi
echo "$case_cmd"|grep -q "\.sim"
if [ $? -eq 0 ]; then
case_file=`echo "$case_cmd"|grep -o ".*\.sim"|awk '{print $NF}'`
fi
if [ -z "$case_file" ]; then
case_file=`echo "$case_cmd"|awk '{print $NF}'`
fi
if [ -z "$case_file" ]; then
continue
fi
case_file="$exec_dir/${case_file}.${index}.${thread_no}.${count}"
count=$(( count + 1 ))
local case_path=`dirname "$case_file"`
if [ ! -z "$case_path" ]; then
mkdir -p $log_dir/$case_path
fi
cmd="${runcase_script} ${script} -w ${workdirs[index]} -c \"${case_cmd}\" -t ${thread_no} -d ${exec_dir} ${timeout_param}"
# echo "$thread_no $count $cmd"
local ret=0
local redo_count=1
start_time=`date +%s`
while [ ${redo_count} -lt 6 ]; do
if [ -f $log_dir/$case_file.log ]; then
cp $log_dir/$case_file.log $log_dir/$case_file.${redo_count}.redolog
fi
echo "${hosts[index]}-${thread_no} order:${count}, redo:${redo_count} task:${line}" >$log_dir/$case_file.log
echo -e "\e[33m >>>>> \e[0m ${case_cmd}"
date >>$log_dir/$case_file.log
# $cmd 2>&1 | tee -a $log_dir/$case_file.log
# ret=${PIPESTATUS[0]}
$cmd >>$log_dir/$case_file.log 2>&1
ret=$?
echo "${hosts[index]} `date` ret:${ret}" >>$log_dir/$case_file.log
if [ $ret -eq 0 ]; then
break
fi
redo=0
grep -q "wait too long for taosd start" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
grep -q "kex_exchange_identification: Connection closed by remote host" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
grep -q "ssh_exchange_identification: Connection closed by remote host" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
grep -q "kex_exchange_identification: read: Connection reset by peer" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
grep -q "Database not ready" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
grep -q "Unable to establish connection" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
if [ $redo_count -lt $case_redo_time ]; then
redo=1
fi
if [ $redo -eq 0 ]; then
break
fi
redo_count=$(( redo_count + 1 ))
done
end_time=`date +%s`
echo >>$log_dir/$case_file.log
echo "${hosts[index]} execute time: $(( end_time - start_time ))s" >>$log_dir/$case_file.log
# echo "$thread_no ${line} DONE"
if [ $ret -ne 0 ]; then
flock -x $lock_file -c "echo \"${hosts[index]} ret:${ret} ${line}\" >>$log_dir/failed.log"
mkdir -p $log_dir/${case_file}.coredump
local remote_coredump_dir="${workdirs[index]}/tmp/thread_volume/$thread_no/coredump"
local scpcmd="sshpass -p ${passwords[index]} scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"
if [ -z ${passwords[index]} ]; then
scpcmd="scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"
fi
cmd="$scpcmd:${remote_coredump_dir}/* $log_dir/${case_file}.coredump/"
$cmd # 2>/dev/null
local case_info=`echo "$line"|cut -d, -f 3,4`
local corefile=`ls $log_dir/${case_file}.coredump/`
corefile=`find $log_dir/${case_file}.coredump/ -name "core.*"`
echo -e "$case_info \e[31m failed\e[0m"
echo "=========================log============================"
cat $log_dir/$case_file.log
echo "====================================================="
echo -e "\e[34m log file: $log_dir/$case_file.log \e[0m"
if [ ! -z "$corefile" ]; then
echo -e "\e[34m corefiles: $corefile \e[0m"
local build_dir=$log_dir/build_${hosts[index]}
local remote_build_dir="${workdirs[index]}/TDengine/debug/build"
if [ $ent -ne 0 ]; then
remote_build_dir="${workdirs[index]}/TDinternal/debug/build"
fi
mkdir $build_dir 2>/dev/null
if [ $? -eq 0 ]; then
# scp build binary
cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/"
echo "$cmd"
$cmd >/dev/null
fi
fi
# get remote sim dir
local remote_sim_dir="${workdirs[index]}/tmp/thread_volume/$thread_no"
local tarcmd="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"
if [ -z ${passwords[index]} ]; then
tarcmd="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}"
fi
cmd="$tarcmd sh -c \"cd $remote_sim_dir; tar -czf sim.tar.gz sim\""
$cmd
local remote_sim_tar="${workdirs[index]}/tmp/thread_volume/$thread_no/sim.tar.gz"
scpcmd="sshpass -p ${passwords[index]} scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"
if [ -z ${passwords[index]} ]; then
scpcmd="scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"
fi
cmd="$scpcmd:${remote_sim_tar} $log_dir/${case_file}.sim.tar.gz"
$cmd
fi
done
}
# echo "hosts: ${hosts[@]}"
# echo "usernames: ${usernames[@]}"
# echo "passwords: ${passwords[@]}"
# echo "workdirs: ${workdirs[@]}"
# echo "threads: ${threads[@]}"
# TODO: check host accessibility
i=0
while [ $i -lt ${#hosts[*]} ]; do
clean_tmp $i &
i=$(( i + 1 ))
done
wait
mkdir -p $log_dir
rm -rf $log_dir/*
task_file=$log_dir/$$.task
lock_file=$log_dir/$$.lock
i=0
j=0
while [ $i -lt ${#hosts[*]} ]; do
j=$(( j + threads[i] ))
i=$(( i + 1 ))
done
prepare_cases $j
i=0
while [ $i -lt ${#hosts[*]} ]; do
j=0
while [ $j -lt ${threads[i]} ]; do
run_thread $i $j &
j=$(( j + 1 ))
done
i=$(( i + 1 ))
done
wait
rm -f $lock_file
rm -f $task_file
# docker ps -a|grep -v CONTAINER|awk '{print $1}'|xargs docker rm -f
RET=0
i=1
if [ -f "$log_dir/failed.log" ]; then
echo "====================================================="
while read line; do
line=`echo "$line"|cut -d, -f 3,4`
echo -e "$i. $line \e[31m failed\e[0m" >&2
i=$(( i + 1 ))
done <$log_dir/failed.log
RET=1
fi
echo "${log_dir}" >&2
date
exit $RET
#!/bin/bash
function usage() {
echo "$0"
echo -e "\t -d execution dir"
echo -e "\t -c command"
echo -e "\t -e enterprise edition"
echo -e "\t -o default timeout value"
echo -e "\t -h help"
}
ent=0
while getopts "d:c:o:eh" opt; do
case $opt in
d)
exec_dir=$OPTARG
;;
c)
cmd=$OPTARG
;;
o)
TIMEOUT_CMD="timeout $OPTARG"
;;
e)
ent=1
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
if [ -z "$exec_dir" ]; then
usage
exit 0
fi
if [ -z "$cmd" ]; then
usage
exit 0
fi
if [ $ent -eq 0 ]; then
export PATH=$PATH:/home/TDengine/debug/build/bin
export LD_LIBRARY_PATH=/home/TDengine/debug/build/lib
ln -s /home/TDengine/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null
CONTAINER_TESTDIR=/home/TDengine
else
export PATH=$PATH:/home/TDinternal/debug/build/bin
export LD_LIBRARY_PATH=/home/TDinternal/debug/build/lib
ln -s /home/TDinternal/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null
CONTAINER_TESTDIR=/home/TDinternal/community
fi
mkdir -p /var/lib/taos/subscribe
mkdir -p /var/log/taos
mkdir -p /var/lib/taos
cd $CONTAINER_TESTDIR/tests/$exec_dir
ulimit -c unlimited
$TIMEOUT_CMD $cmd
RET=$?
if [ $RET -ne 0 ]; then
pwd
fi
exit $RET
#!/bin/bash
function usage() {
echo "$0"
echo -e "\t -w work dir"
echo -e "\t -d execution dir"
echo -e "\t -c command"
echo -e "\t -t thread number"
echo -e "\t -e enterprise edition"
echo -e "\t -o default timeout value"
echo -e "\t -h help"
}
ent=0
while getopts "w:d:c:t:o:eh" opt; do
case $opt in
w)
WORKDIR=$OPTARG
;;
d)
exec_dir=$OPTARG
;;
c)
cmd=$OPTARG
;;
t)
thread_no=$OPTARG
;;
e)
ent=1
;;
o)
extra_param="-o $OPTARG"
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
if [ -z "$WORKDIR" ]; then
usage
exit 1
fi
if [ -z "$exec_dir" ]; then
usage
exit 1
fi
if [ -z "$cmd" ]; then
usage
exit 1
fi
if [ -z "$thread_no" ]; then
usage
exit 1
fi
if [ $ent -ne 0 ]; then
# enterprise edition
extra_param="$extra_param -e"
INTERNAL_REPDIR=$WORKDIR/TDinternal
REPDIR=$INTERNAL_REPDIR/community
CONTAINER_TESTDIR=/home/TDinternal/community
SIM_DIR=/home/TDinternal/sim
REP_MOUNT_PARAM="$INTERNAL_REPDIR:/home/TDinternal"
else
# community edition
REPDIR=$WORKDIR/TDengine
CONTAINER_TESTDIR=/home/TDengine
SIM_DIR=/home/TDengine/sim
REP_MOUNT_PARAM="$REPDIR:/home/TDengine"
fi
ulimit -c unlimited
TMP_DIR=$WORKDIR/tmp
MOUNT_DIR=""
rm -rf ${TMP_DIR}/thread_volume/$thread_no/sim
mkdir -p ${TMP_DIR}/thread_volume/$thread_no/sim/tsim
mkdir -p ${TMP_DIR}/thread_volume/$thread_no/coredump
rm -rf ${TMP_DIR}/thread_volume/$thread_no/coredump/*
if [ ! -d "${TMP_DIR}/thread_volume/$thread_no/$exec_dir" ]; then
subdir=`echo "$exec_dir"|cut -d/ -f1`
echo "cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/"
cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/
fi
MOUNT_DIR="$TMP_DIR/thread_volume/$thread_no/$exec_dir:$CONTAINER_TESTDIR/tests/$exec_dir"
echo "$thread_no -> ${exec_dir}:$cmd"
coredump_dir=`cat /proc/sys/kernel/core_pattern | xargs dirname`
docker run \
-v $REP_MOUNT_PARAM \
-v $MOUNT_DIR \
-v "$TMP_DIR/thread_volume/$thread_no/sim:${SIM_DIR}" \
-v ${TMP_DIR}/thread_volume/$thread_no/coredump:$coredump_dir \
-v $WORKDIR/taos-connector-python/taos:/usr/local/lib/python3.8/site-packages/taos:ro \
--rm --ulimit core=-1 taos_test:v1.0 $CONTAINER_TESTDIR/tests/parallel_test/run_case.sh -d "$exec_dir" -c "$cmd" $extra_param
ret=$?
exit $ret
......@@ -63,6 +63,7 @@
# ---- tstream
./test.sh -f tsim/tstream/basic0.sim
./test.sh -f tsim/tstream/basic1.sim
# ---- transaction
./test.sh -f tsim/trans/create_db.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1);
sleep 1000
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
return -1
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
endi
if $data02 != 2 then
print ======$data02
return -1
endi
if $data03 != 5 then
print ======$data03
return -1
endi
if $data04 != 2 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 2 then
print ======$data13
return -1
endi
if $data14 != 2 then
print ======$data14
return -1
endi
if $data15 != 3 then
print ======$data15
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 3 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
endi
if $data32 != 1 then
print ======$data32
return -1
endi
if $data33 != 4 then
print ======$data33
return -1
endi
if $data34 != 2 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
return -1
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
endi
if $data02 != 2 then
print ======$data02
return -1
endi
if $data03 != 5 then
print ======$data03
return -1
endi
if $data04 != 2 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 12 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 3 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
endi
if $data32 != 1 then
print ======$data32
return -1
endi
if $data33 != 4 then
print ======$data33
return -1
endi
if $data34 != 2 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 2 then
print ======$data11
return -1
endi
if $data12 != 2 then
print ======$data12
return -1
endi
if $data13 != 24 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
endi
if $data12 != 3 then
print ======$data12
return -1
endi
if $data13 != 36 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
endi
if $data12 != 3 then
print ======$data12
return -1
endi
if $data13 != 6 then
print ======$data13
return -1
endi
if $data14 != 3 then
print ======$data14
return -1
endi
if $data15 != 1 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2
if $data21 != 2 then
print ======$data21
return -1
endi
if $data22 != 2 then
print ======$data22
return -1
endi
if $data23 != 6 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -138,7 +138,8 @@ class TDTestCase:
if "2: service ok" in retVal:
tdLog.info("taos -k success")
else:
tdLog.exit("taos -k fail")
tdLog.info(retVal)
tdLog.exit("taos -k fail 1")
# stop taosd
tdDnodes.stop(1)
......@@ -149,7 +150,8 @@ class TDTestCase:
if "0: unavailable" in retVal:
tdLog.info("taos -k success")
else:
tdLog.exit("taos -k fail")
tdLog.info(retVal)
tdLog.exit("taos -k fail 2")
# restart taosd
tdDnodes.start(1)
......@@ -158,7 +160,8 @@ class TDTestCase:
if "2: service ok" in retVal:
tdLog.info("taos -k success")
else:
tdLog.exit("taos -k fail")
tdLog.info(retVal)
tdLog.exit("taos -k fail 3")
tdLog.printNoPrefix("================================ parameter: -n")
# stop taosd
......
......@@ -15,8 +15,16 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method
tdSql.prepare()
# get system timezone
time_zone = os.popen('timedatectl | grep zone').read(
).strip().split(':')[1].lstrip()
time_zone_arr = os.popen('timedatectl | grep zone').read(
).strip().split(':')
if len(time_zone_arr) > 1:
time_zone = time_zone_arr[1].lstrip()
else:
# possibly in a docker container
time_zone_1 = os.popen('ls -l /etc/localtime|awk -F/ \'{print $(NF-1) "/" $NF}\'').read().strip()
time_zone_2 = os.popen('date "+(%Z, %z)"').read().strip()
time_zone = time_zone_1 + " " + time_zone_2
print("expected time zone: " + time_zone)
tdLog.printNoPrefix("==========step1:create tables==========")
tdSql.execute(
......
此差异已折叠。
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
#rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict)
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
#tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), logSql) # output sql.txt file
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def newcur(self,cfg,host,port):
user = "root"
password = "taosdata"
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
cur=con.cursor()
print(cur)
return cur
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName)
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
pre_create = "create table"
sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum):
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
if (i > 0) and (i%100 == 0):
tsql.execute(sql)
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
event.set()
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s_%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s_%d values " %(stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareEnv(self, **parameterDict):
print ("input parameters:")
print (parameterDict)
# create new connector for my thread
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
self.create_tables(tsql,\
parameterDict["dbName"],\
parameterDict["vgroups"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"])
self.insert_data(tsql,\
parameterDict["dbName"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\
parameterDict["batchNum"],\
parameterDict["startTs"])
return
def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 1: Produce while consume to subscribe one db")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db1', \
'vgroups': 4, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
tdLog.info("create topics from db")
topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicName1
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
event.wait()
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
# wait for data ready
prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1:
break
else:
time.sleep(5)
tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3)))
tdSql.checkData(0 , 1, consumerId)
# mulit rows and mulit tables in one sql, this num of msg is not sure
#tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath)
#self.tmqCase2(cfgPath, buildPath)
#self.tmqCase3(cfgPath, buildPath)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
#!/bin/bash
function usage() {
echo "$0"
echo -e "\t -e enterprise edition"
echo -e "\t -h help"
}
ent=0
while getopts "eh" opt; do
case $opt in
e)
ent=1
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
script_dir=`dirname $0`
cd ${script_dir}
PWD=`pwd`
if [ $ent -eq 0 ]; then
cd ../../debug
else
cd ../../../debug
fi
ctest -j8
ret=$?
exit $ret
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册