提交 50e9656d 编写于 作者: L Liu Jicong

Merge branch '3.0' into feature/stream

......@@ -56,7 +56,8 @@ typedef enum EStreamType {
STREAM_CLEAR,
STREAM_INVALID,
STREAM_GET_ALL,
STREAM_DELETE,
STREAM_DELETE_RESULT,
STREAM_DELETE_DATA,
STREAM_RETRIEVE,
STREAM_PULL_DATA,
STREAM_PULL_OVER,
......
......@@ -20,6 +20,12 @@
extern "C" {
#endif
// If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following sectio
#ifndef ALLOW_FORBID_FUNC
#define qsort QSORT_FUNC_TAOS_FORBID
#endif
#define TPOW2(x) ((x) * (x))
#define TABS(x) ((x) > 0 ? (x) : -(x))
......
......@@ -67,7 +67,7 @@ bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs
int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes);
TdUcs4* tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4);
bool taosValidateEncodec(const char *encodec);
int32_t taosHexEncode(const char *src, char *dst, int32_t len);
int32_t taosHexEncode(const unsigned char *src, char *dst, int32_t len);
int32_t taosHexDecode(const char *src, char *dst, int32_t len);
int32_t taosWcharWidth(TdWchar wchar);
......
......@@ -1259,6 +1259,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
dst->info.rows = src->info.rows;
dst->info.window = src->info.window;
dst->info.type = src->info.type;
return TSDB_CODE_SUCCESS;
}
......
......@@ -21,7 +21,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL) {
if (pVnode == NULL || pVnode->dropped) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
......@@ -81,16 +81,18 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
vmReleaseVnode(pMgmt, pVnode);
while (pVnode->refCount > 0) taosMsleep(10);
dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
dTrace("vgId:%d, vnode-fetch queue is empty", pVnode->vgId);
vmFreeQueue(pMgmt, pVnode);
vnodeClose(pVnode->pImpl);
pVnode->pImpl = NULL;
dDebug("vgId:%d, vnode is closed", pVnode->vgId);
if (pVnode->dropped) {
......
......@@ -107,7 +107,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);
int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL); // no response here
int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL); // no response here
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
......@@ -146,8 +146,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(),
TMSG_INFO(pMsg->msgType));
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, msgtype:%s qtype:%d", pHead->vgId, pMsg,
terrstr(), TMSG_INFO(pMsg->msgType), qtype);
return terrno != 0 ? terrno : -1;
}
......
......@@ -38,6 +38,8 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
#ifdef __cplusplus
}
#endif
......
......@@ -21,6 +21,7 @@
#include "mndShow.h"
#include "mndSma.h"
#include "mndStb.h"
#include "mndStream.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
......@@ -927,6 +928,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropStreamByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
......@@ -947,7 +949,6 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
mndTransSetRpcRsp(pTrans, pRsp, rspLen);
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
......
......@@ -523,6 +523,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.sourceDbUid = pDb->uid;
streamObj.targetDbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
streamObj.smaId = smaObj.uid;
......@@ -853,36 +854,26 @@ _OVER:
}
int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
SSmaObj *pSma = NULL;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SSmaObj *pSma = NULL;
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
if (pIter == NULL) break;
if (pSma->dbUid == pDb->uid) {
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
if (pVgroup == NULL) goto _OVER;
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
mndReleaseVgroup(pMnode, pVgroup);
pVgroup = NULL;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) {
sdbRelease(pSdb, pSma);
sdbCancelFetch(pSdb, pSma);
return -1;
}
}
sdbRelease(pSdb, pSma);
}
code = 0;
_OVER:
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pSma);
mndReleaseVgroup(pMnode, pVgroup);
return code;
return 0;
}
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
......
......@@ -674,27 +674,29 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
void *pIter = NULL;
SStreamObj *pStream = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
if (pStream->sourceDbUid != pStream->targetDbUid) {
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
return -1;
} else {
// TODO drop all task on snode
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
return -1;
}
}
} else {
sdbRelease(pSdb, pStream);
continue;
}
#if 0
......
......@@ -90,7 +90,7 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
for (int32_t i = 0; i < actionNum; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
if (pAction->actionType == TRANS_ACTION_RAW) {
rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t));
rawDataLen += (sizeof(STransAction) + sdbGetRawTotalSize(pAction->pRaw));
} else if (pAction->actionType == TRANS_ACTION_MSG) {
rawDataLen += (sizeof(STransAction) + pAction->contLen);
} else {
......@@ -105,7 +105,7 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE + pTrans->paramLen;
rawDataLen += mndTransGetActionsSize(pTrans->redoActions);
rawDataLen += mndTransGetActionsSize(pTrans->undoActions);
rawDataLen += mndTransGetActionsSize(pTrans->commitActions);
......@@ -226,7 +226,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
_OVER:
if (terrno != 0) {
mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr());
mError("trans:%d, failed to encode to raw:%p maxlen:%d len:%d since %s", pTrans->id, pRaw, sdbGetRawTotalSize(pRaw),
dataPos, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
......@@ -1025,7 +1026,7 @@ static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
pTrans->lastEpset = pAction->epSet;
pTrans->lastErrorNo == 0;
pTrans->lastErrorNo = 0;
return 0;
}
......
......@@ -185,5 +185,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
.contLen = ntohl(pReq->length),
};
ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put into write-queue since %s", terrstr());
}
}
......@@ -428,7 +428,7 @@ typedef struct SIntervalAggOperatorInfo {
STimeWindowAggSupp twAggSup;
bool invertible;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
bool ignoreCloseWindow;
bool ignoreExpiredData;
} SIntervalAggOperatorInfo;
typedef struct SStreamFinalIntervalOperatorInfo {
......@@ -450,7 +450,7 @@ typedef struct SStreamFinalIntervalOperatorInfo {
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
bool ignoreCloseWindow;
bool ignoreExpiredData;
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
......@@ -588,7 +588,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreCloseWindow;
bool ignoreExpiredData;
} SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
......@@ -632,7 +632,7 @@ typedef struct SStreamStateAggOperatorInfo {
void* pDelIterator;
SArray* pScanWindow;
SArray* pChildren; // cache for children's result;
bool ignoreCloseWindow;
bool ignoreExpiredData;
} SStreamStateAggOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
......
......@@ -3253,6 +3253,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator);
break;
}
if (pBlock->info.type == STREAM_RETRIEVE) {
// for stream interval
return pBlock;
}
// the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
......
......@@ -807,6 +807,23 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
}
static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
ASSERT(rowIndex < pBlock->info.rows);
switch (pBlock->info.type)
{
case STREAM_RETRIEVE: {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
uint64_t* groupCol = (uint64_t*)pColInfo->pData;
pInfo->groupId = groupCol[rowIndex];
}
break;
case STREAM_DELETE_DATA:
break;
default:
break;
}
}
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
STimeWindow win = {
.skey = INT64_MIN,
......@@ -829,6 +846,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
} else {
win =
getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, pInfo->interval.precision, NULL);
setGroupId(pInfo, pSDB, 2, *pRowIndex);
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
}
......@@ -1031,10 +1049,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
} else {
if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
if (isStateWindow(pInfo)) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (!prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
}
}
if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
......
......@@ -838,7 +838,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, &pInfo->win);
int32_t ret = TSDB_CODE_SUCCESS;
if (!pInfo->ignoreCloseWindow || !isCloseWindow(&win, &pInfo->twAggSup)) {
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
......@@ -871,7 +871,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
}
if (!pInfo->ignoreCloseWindow || !isCloseWindow(&win, &pInfo->twAggSup)) {
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, pInfo->order);
......@@ -886,7 +886,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (startPos < 0) {
break;
}
if (pInfo->ignoreCloseWindow && isCloseWindow(&nextWin, &pInfo->twAggSup)) {
if (pInfo->ignoreExpiredData && isCloseWindow(&nextWin, &pInfo->twAggSup)) {
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
......@@ -1535,7 +1535,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->interval = *pInterval;
pInfo->execModel = pTaskInfo->execModel;
pInfo->twAggSup = *pTwAggSupp;
pInfo->ignoreCloseWindow = false;
pInfo->ignoreExpiredData = pPhyNode->window.igExpired;
if (pPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
......@@ -2292,7 +2292,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
pInfo->interval.precision, NULL);
while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if (pInfo->ignoreCloseWindow && isClosed) {
if (pInfo->ignoreExpiredData && isClosed) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
......@@ -2710,7 +2710,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createPullDataBlock();
pInfo->ignoreCloseWindow = false;
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true;
......@@ -2852,12 +2852,12 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL;
pInfo->pDelRes = createOneDataBlock(pResBlock, false);
pInfo->pDelRes->info.type = STREAM_DELETE;
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
blockDataEnsureCapacity(pInfo->pDelRes, 64);
pInfo->pChildren = NULL;
pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode;
pInfo->ignoreCloseWindow = false;
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
pOperator->name = "StreamSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
......@@ -3133,7 +3133,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for (int32_t i = 0; i < pSDataBlock->info.rows;) {
if (pInfo->ignoreCloseWindow && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
i++;
continue;
}
......@@ -3413,8 +3413,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForSession, pInfo->ignoreCloseWindow);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
getResWinForSession, pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated);
......@@ -3822,7 +3822,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
for (int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) {
if (pInfo->ignoreCloseWindow && isOverdue(tsCols[i], &pInfo->twAggSup)) {
if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) {
i++;
continue;
}
......@@ -3866,12 +3866,14 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state");
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
printDataBlock(pBInfo->pRes, "single state");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
......@@ -3884,6 +3886,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) {
break;
}
printDataBlock(pBlock, "single state recv");
if (pBlock->info.type == STREAM_CLEAR) {
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
......@@ -3903,8 +3906,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForState, pInfo->ignoreCloseWindow);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
getResWinForState, pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated);
......@@ -3914,9 +3917,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state");
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
printDataBlock(pBInfo->pRes, "single state");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
......@@ -3975,10 +3980,10 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL;
pInfo->pDelRes = createOneDataBlock(pResBlock, false);
pInfo->pDelRes->info.type = STREAM_DELETE;
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
blockDataEnsureCapacity(pInfo->pDelRes, 64);
pInfo->pChildren = NULL;
pInfo->ignoreCloseWindow = false;
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
pOperator->name = "StreamStateAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
......
......@@ -39,6 +39,174 @@ static int32_t invaildFuncParaValueErrMsg(char* pErrBuf, int32_t len, const char
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName);
}
#define TIME_UNIT_INVALID 1
#define TIME_UNIT_TOO_SMALL 2
static int32_t validateTimeUnitParam(uint8_t dbPrec, const SValueNode* pVal) {
if (!pVal->isDuration) {
return TIME_UNIT_INVALID;
}
if (TSDB_TIME_PRECISION_MILLI == dbPrec && 0 == strcasecmp(pVal->literal, "1u")) {
return TIME_UNIT_TOO_SMALL;
}
if (pVal->literal[0] != '1' || (pVal->literal[1] != 'u' && pVal->literal[1] != 'a' &&
pVal->literal[1] != 's' && pVal->literal[1] != 'm' &&
pVal->literal[1] != 'h' && pVal->literal[1] != 'd' &&
pVal->literal[1] != 'w')) {
return TIME_UNIT_INVALID;
}
return TSDB_CODE_SUCCESS;
}
/* Following are valid ISO-8601 timezone format:
* 1 z/Z
* 2 ±hh:mm
* 3 ±hhmm
* 4 ±hh
*
*/
static bool validateHourRange(int8_t hour) {
if (hour < 0 || hour > 12) {
return false;
}
return true;
}
static bool validateMinuteRange(int8_t hour, int8_t minute, char sign) {
if (minute == 0 || (minute == 30 && (hour == 3 || hour == 5) && sign == '+')) {
return true;
}
return false;
}
static bool validateTimestampDigits(const SValueNode* pVal) {
if (!IS_INTEGER_TYPE(pVal->node.resType.type)) {
return false;
}
int64_t tsVal = pVal->datum.i;
char fraction[20] = {0};
NUM_TO_STRING(pVal->node.resType.type, &tsVal, sizeof(fraction), fraction);
int32_t tsDigits = (int32_t)strlen(fraction);
if (tsDigits > TSDB_TIME_PRECISION_SEC_DIGITS) {
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS || tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS ||
tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
return true;
} else {
return false;
}
}
return true;
}
static bool validateTimezoneFormat(const SValueNode* pVal) {
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
return false;
}
char* tz = varDataVal(pVal->datum.p);
int32_t len = varDataLen(pVal->datum.p);
char buf[3] = {0};
int8_t hour = -1, minute = -1;
if (len == 0) {
return false;
} else if (len == 1 && (tz[0] == 'z' || tz[0] == 'Z')) {
return true;
} else if ((tz[0] == '+' || tz[0] == '-')) {
switch (len) {
case 3:
case 5: {
for (int32_t i = 1; i < len; ++i) {
if (!isdigit(tz[i])) {
return false;
}
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 4) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
}
break;
}
case 6: {
for (int32_t i = 1; i < len; ++i) {
if (i == 3) {
if (tz[i] != ':') {
return false;
}
continue;
}
if (!isdigit(tz[i])) {
return false;
}
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 5) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
}
break;
}
default: {
return false;
}
}
} else {
return false;
}
return true;
}
void static addTimezoneParam(SNodeList* pList) {
char buf[6] = {0};
time_t t = taosTime(NULL);
struct tm* tmInfo = taosLocalTime(&t, NULL);
strftime(buf, sizeof(buf), "%z", tmInfo);
int32_t len = (int32_t)strlen(buf);
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
pVal->literal = strndup(buf, len);
pVal->isDuration = false;
pVal->translate = true;
pVal->node.resType.type = TSDB_DATA_TYPE_BINARY;
pVal->node.resType.bytes = len + VARSTR_HEADER_SIZE;
pVal->node.resType.precision = TSDB_TIME_PRECISION_MILLI;
pVal->datum.p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE + 1);
varDataSetLen(pVal->datum.p, len);
strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
nodesListAppend(pList, (SNode*)pVal);
}
void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) {
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
pVal->literal = NULL;
......@@ -525,9 +693,15 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (pValue->datum.i == 0) {
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode *)nodesListGetNode(pFunc->pParameterList, 1));
if (ret == TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"ELAPSED function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"ELAPSED function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
}
......@@ -843,6 +1017,19 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (numOfParams == 4) {
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode *)nodesListGetNode(pFunc->pParameterList, 3));
if (ret == TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"STATEDURATION function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"STATEDURATION function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
}
// set result type
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS;
......@@ -1284,152 +1471,6 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return TSDB_CODE_SUCCESS;
}
/* Following are valid ISO-8601 timezone format:
* 1 z/Z
* 2 ±hh:mm
* 3 ±hhmm
* 4 ±hh
*
*/
static bool validateHourRange(int8_t hour) {
if (hour < 0 || hour > 12) {
return false;
}
return true;
}
static bool validateMinuteRange(int8_t hour, int8_t minute, char sign) {
if (minute == 0 || (minute == 30 && (hour == 3 || hour == 5) && sign == '+')) {
return true;
}
return false;
}
static bool validateTimestampDigits(const SValueNode* pVal) {
if (!IS_INTEGER_TYPE(pVal->node.resType.type)) {
return false;
}
int64_t tsVal = pVal->datum.i;
char fraction[20] = {0};
NUM_TO_STRING(pVal->node.resType.type, &tsVal, sizeof(fraction), fraction);
int32_t tsDigits = (int32_t)strlen(fraction);
if (tsDigits > TSDB_TIME_PRECISION_SEC_DIGITS) {
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS || tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS ||
tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
return true;
} else {
return false;
}
}
return true;
}
static bool validateTimezoneFormat(const SValueNode* pVal) {
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
return false;
}
char* tz = varDataVal(pVal->datum.p);
int32_t len = varDataLen(pVal->datum.p);
char buf[3] = {0};
int8_t hour = -1, minute = -1;
if (len == 0) {
return false;
} else if (len == 1 && (tz[0] == 'z' || tz[0] == 'Z')) {
return true;
} else if ((tz[0] == '+' || tz[0] == '-')) {
switch (len) {
case 3:
case 5: {
for (int32_t i = 1; i < len; ++i) {
if (!isdigit(tz[i])) {
return false;
}
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 4) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
}
break;
}
case 6: {
for (int32_t i = 1; i < len; ++i) {
if (i == 3) {
if (tz[i] != ':') {
return false;
}
continue;
}
if (!isdigit(tz[i])) {
return false;
}
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 5) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
}
break;
}
default: {
return false;
}
}
} else {
return false;
}
return true;
}
void static addTimezoneParam(SNodeList* pList) {
char buf[6] = {0};
time_t t = taosTime(NULL);
struct tm* tmInfo = taosLocalTime(&t, NULL);
strftime(buf, sizeof(buf), "%z", tmInfo);
int32_t len = (int32_t)strlen(buf);
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
pVal->literal = strndup(buf, len);
pVal->isDuration = false;
pVal->translate = true;
pVal->node.resType.type = TSDB_DATA_TYPE_BINARY;
pVal->node.resType.bytes = len + VARSTR_HEADER_SIZE;
pVal->node.resType.precision = TSDB_TIME_PRECISION_MILLI;
pVal->datum.p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE + 1);
varDataSetLen(pVal->datum.p, len);
strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
nodesListAppend(pList, (SNode*)pVal);
}
static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (1 != numOfParams && 2 != numOfParams) {
......@@ -1498,6 +1539,16 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode *)nodesListGetNode(pFunc->pParameterList, 1));
if (ret == TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMETRUNCATE function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMETRUNCATE function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType =
......@@ -1526,6 +1577,18 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
if (3 == numOfParams) {
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode *)nodesListGetNode(pFunc->pParameterList, 2));
if (ret == TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMEDIFF function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMEDIFF function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
}
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
......
......@@ -35,7 +35,7 @@ if (${BUILD_WITH_INVERTEDINDEX})
endif(${BUILD_WITH_INVERTEDINDEX})
if (${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
# if (${BUILD_TEST})
# add_subdirectory(test)
# endif(${BUILD_TEST})
......@@ -196,7 +196,7 @@ int32_t taosUcs4len(TdUcs4 *ucs4) {
}
//dst buffer size should be at least 2*len + 1
int32_t taosHexEncode(const char *src, char *dst, int32_t len) {
int32_t taosHexEncode(const unsigned char *src, char *dst, int32_t len) {
if (!dst) {
return -1;
}
......
......@@ -93,6 +93,7 @@
./test.sh -f tsim/stream/basic0.sim
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
# ./test.sh -f tsim/stream/distributesession0.sim
......@@ -105,6 +106,7 @@
./test.sh -f tsim/stream/partitionby1.sim
./test.sh -f tsim/stream/schedSnode.sim
./test.sh -f tsim/stream/windowClose.sim
./test.sh -f tsim/stream/ignoreExpiredData.sim
# ---- transaction
./test.sh -f tsim/trans/lossdata1.sim
......@@ -159,6 +161,7 @@
#./test.sh -f tsim/mnode/basic1.sim -m
# --- sma
./test.sh -f tsim/sma/drop_sma.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
......
......@@ -135,7 +135,7 @@ echo "jniDebugFlag 143" >> $TAOS_CFG
echo "qDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 143" >> $TAOS_CFG
echo "uDebugFlag 131" >> $TAOS_CFG
echo "sDebugFlag 143" >> $TAOS_CFG
echo "wDebugFlag 143" >> $TAOS_CFG
echo "numOfLogLines 20000000" >> $TAOS_CFG
......
......@@ -64,12 +64,59 @@ sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) in
print --> drop stb
sql drop table stb;
print ========== step5
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint, c_float float, c_double double, c_bool bool, c_binary binary(16), c_nchar nchar(32), c_ts timestamp, c_tint_un tinyint unsigned, c_sint_un smallint unsigned, c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
print ========== step6 repeat
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint ) tags (t_int int);
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
print ========== step7
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint,c_float float, c_double double, c_bool bool,c_binary binary(16), c_nchar nchar(32), c_ts timestamp,c_tint_un tinyint unsigned, c_sint_un smallint unsigned,c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);
sql create table ct1 using stb1 tags ( 1 );
sql create table ct2 using stb1 tags ( 2 );
sql create table ct3 using stb1 tags ( 3 );
sql create table ct4 using stb1 tags ( 4 );
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
sql CREATE SMA INDEX sma_index_2 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) max_delay 6m;
sql CREATE SMA INDEX sma_index_3 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) watermark 5s max_delay 6m;
sql DROP INDEX sma_index_1 ;
sql DROP INDEX sma_index_2 ;
sql DROP INDEX sma_index_3 ;
print ========== step8
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint,c_float float, c_double double, c_bool bool,c_binary binary(16), c_nchar nchar(32), c_ts timestamp,c_tint_un tinyint unsigned, c_sint_un smallint unsigned,c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);
sql create table ct1 using stb1 tags ( 1 );
sql create table ct2 using stb1 tags ( 2 );
sql create table ct3 using stb1 tags ( 3 );
sql create table ct4 using stb1 tags ( 4 );
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
sql CREATE SMA INDEX sma_index_2 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) max_delay 6m;
sql CREATE SMA INDEX sma_index_3 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) watermark 5s max_delay 6m;
sql DROP INDEX sma_index_1 ;
sql DROP INDEX sma_index_2 ;
sql DROP INDEX sma_index_3 ;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode4 -s stop -x SIGINT
\ No newline at end of file
......@@ -83,9 +83,9 @@ sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0)
$loop_count = 0
loop1:
sleep 300
sql select * from streamtST1;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
......
......@@ -10,6 +10,30 @@ sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
print ===== step1
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ===== step2
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
......
此差异已折叠。
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
print ===== step1
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ===== step2
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once IGNORE EXPIRED into streamt1 as select _wstartts, count(*) c1, sum(a) c3 from t1 interval(10s);
sql create stream streams2 trigger at_once IGNORE EXPIRED into streamt2 as select _wstartts, count(*) c1, sum(a) c3 from t1 session(ts,10s);
sql create stream streams3 trigger at_once IGNORE EXPIRED into streamt3 as select _wstartts, count(*) c1, sum(a) c3 from t1 state_window(a);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.1);
sql insert into t1 values(1648791233002,2,2,3,2.1);
sql insert into t1 values(1648791243003,2,2,3,3.1);
sql insert into t1 values(1648791200000,4,2,3,4.1);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop1
endi
$loop_count = 0
loop2:
sleep 300
sql select * from streamt2;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop2
endi
$loop_count = 0
loop3:
sleep 300
sql select * from streamt3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop3
endi
print =============== create database
sql create database test1 vgroups 4
sql show databases
print ======database=$rows
sql use test1
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t1 trigger at_once IGNORE EXPIRED into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql create stream stream_t2 trigger at_once IGNORE EXPIRED into streamtST2 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st session(ts, 10s) ;
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3);
sql insert into ts2 values(1648791211000,1,2,3);
sql insert into ts2 values(1648791222001,2,2,3);
$loop_count = 0
loop4:
sleep 300
sql select * from streamtST1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop4
endi
if $data02 != 1 then
print =====data02=$data02
goto loop4
endi
$loop_count = 0
loop5:
sleep 300
sql select * from streamtST2;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop5
endi
if $data02 != 1 then
print =====data02=$data02
goto loop5
endi
system sh/stop_dnodes.sh
\ No newline at end of file
......@@ -78,21 +78,29 @@ class TDTestCase:
password = "taosdata"
port =6030
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
print(con)
tdLog.debug(con)
return con
def stmtExe(self,conn,sql,bindStat):
queryStat=conn.statement("%s"%sql)
queryStat.bind_param(bindStat)
queryStat.execute()
result=queryStat.use_result()
rows=result.fetch_all()
return rows
def test_stmt_set_tbname_tag(self,conn):
dbname = "stmt_set_tbname_tag"
dbname = "stmt_tag"
stablename = 'log'
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s PRECISION 'us' " % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
conn.execute("create table if not exists %s(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp , vc varchar(100)) tags (t1 timestamp, t2 bool,\
t3 tinyint, t4 tinyint, t5 smallint, t6 int, t7 bigint, t8 tinyint unsigned, t9 smallint unsigned, \
t10 int unsigned, t11 bigint unsigned, t12 float, t13 double, t14 binary(100), t15 nchar(100), t16 timestamp)")
t10 int unsigned, t11 bigint unsigned, t12 float, t13 double, t14 binary(100), t15 nchar(100), t16 timestamp)"%stablename)
stmt = conn.statement("insert into ? using log tags (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) \
values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
......@@ -139,142 +147,93 @@ class TDTestCase:
assert stmt.affected_rows == 3
#query all
querystmt1=conn.statement("select * from log where bu < ?")
queryparam1=new_bind_params(1)
print(type(queryparam1))
queryparam1[0].int(10)
querystmt1.bind_param(queryparam1)
querystmt1.execute()
result1=querystmt1.use_result()
rows1=result1.fetch_all()
print(rows1[0])
print(rows1[1])
print(rows1[2])
assert str(rows1[0][0]) == "2021-07-21 17:56:32.589111"
assert rows1[0][10] == 3
assert rows1[1][10] == 4
queryparam=new_bind_params(1)
queryparam[0].int(10)
rows=self.stmtExe(conn,"select * from log where bu < ?",queryparam)
tdLog.debug("assert 1st case %s"%rows)
assert str(rows[0][0]) == "2021-07-21 17:56:32.589111"
assert rows[0][10] == 3 , '1st case is failed'
assert rows[1][10] == 4 , '1st case is failed'
#query: Numeric Functions
querystmt2=conn.statement("select abs(?) from log where bu < ?")
queryparam2=new_bind_params(2)
print(type(queryparam2))
queryparam2[0].int(5)
queryparam2[1].int(5)
querystmt2.bind_param(queryparam2)
querystmt2.execute()
result2=querystmt2.use_result()
rows2=result2.fetch_all()
print("2",rows2)
assert rows2[0][0] == 5
assert rows2[1][0] == 5
queryparam=new_bind_params(2)
queryparam[0].int(5)
queryparam[1].int(5)
rows=self.stmtExe(conn,"select abs(?) from log where bu < ?",queryparam)
tdLog.debug("assert 2nd case %s"%rows)
assert rows[0][0] == 5 , '2nd case is failed'
assert rows[1][0] == 5 , '2nd case is failed'
#query: Numeric Functions and escapes
querystmt3=conn.statement("select abs(?) from log where nn= 'a? long string with 中文字符' ")
queryparam3=new_bind_params(1)
print(type(queryparam3))
queryparam3[0].int(5)
querystmt3.bind_param(queryparam3)
querystmt3.execute()
result3=querystmt3.use_result()
rows3=result3.fetch_all()
print("3",rows3)
assert rows3 == []
# #query: string Functions
# querystmt3=conn.statement("select CHAR_LENGTH(?) from log ")
# queryparam3=new_bind_params(1)
# print(type(queryparam3))
# queryparam3[0].binary('中文字符')
# querystmt3.bind_param(queryparam3)
# querystmt3.execute()
# result3=querystmt3.use_result()
# rows3=result3.fetch_all()
# print("4",rows3)
# assert rows3[0][0] == 12, 'fourth case is failed'
# assert rows3[1][0] == 12, 'fourth case is failed'
# #query: conversion Functions
# querystmt4=conn.statement("select cast( ? as bigint) from log ")
# queryparam4=new_bind_params(1)
# print(type(queryparam4))
# queryparam4[0].binary('1232a')
# querystmt4.bind_param(queryparam4)
# querystmt4.execute()
# result4=querystmt4.use_result()
# rows4=result4.fetch_all()
# print("5",rows4)
# assert rows4[0][0] == 1232
# assert rows4[1][0] == 1232
# querystmt4=conn.statement("select cast( ? as binary(10)) from log ")
# queryparam4=new_bind_params(1)
# print(type(queryparam4))
# queryparam4[0].int(123)
# querystmt4.bind_param(queryparam4)
# querystmt4.execute()
# result4=querystmt4.use_result()
# rows4=result4.fetch_all()
# print("6",rows4)
# assert rows4[0][0] == '123'
# assert rows4[1][0] == '123'
# #query: datatime Functions
# querystmt4=conn.statement(" select timediff('2021-07-21 17:56:32.590111',?,1s) from log ")
# queryparam4=new_bind_params(1)
# print(type(queryparam4))
# queryparam4[0].timestamp(1626861392591111)
# querystmt4.bind_param(queryparam4)
# querystmt4.execute()
# result4=querystmt4.use_result()
# rows4=result4.fetch_all()
# print("7",rows4)
# assert rows4[0][0] == 1, 'seventh case is failed'
# assert rows4[1][0] == 1, 'seventh case is failed'
queryparam=new_bind_params(1)
queryparam[0].int(5)
rows=self.stmtExe(conn,"select abs(?) from log where nn= 'a? long string with 中文字符'",queryparam)
tdLog.debug("assert 3rd case %s"%rows)
assert rows == [] , '3rd case is failed'
#query: string Functions
queryparam=new_bind_params(1)
queryparam[0].binary('中文字符')
rows=self.stmtExe(conn,"select CHAR_LENGTH(?) from log ",queryparam)
tdLog.debug("assert 4th case %s"%rows)
assert rows[0][0] == 12, '4th case is failed'
assert rows[1][0] == 12, '4th case is failed'
queryparam=new_bind_params(1)
queryparam[0].binary('123')
rows=self.stmtExe(conn,"select CHAR_LENGTH(?) from log ",queryparam)
tdLog.debug("assert 4th case %s"%rows)
assert rows[0][0] == 3, '4th.1 case is failed'
assert rows[1][0] == 3, '4th.1 case is failed'
#query: conversion Functions
queryparam=new_bind_params(1)
queryparam[0].binary('1232a')
rows=self.stmtExe(conn,"select cast( ? as bigint) from log",queryparam)
tdLog.debug("assert 5th case %s"%rows)
assert rows[0][0] == 1232, '5th.1 case is failed'
assert rows[1][0] == 1232, '5th.1 case is failed'
querystmt4=conn.statement("select cast( ? as binary(10)) from log ")
queryparam=new_bind_params(1)
queryparam[0].int(123)
rows=self.stmtExe(conn,"select cast( ? as bigint) from log",queryparam)
tdLog.debug("assert 6th case %s"%rows)
assert rows[0][0] == 123, '6th.1 case is failed'
assert rows[1][0] == 123, '6th.1 case is failed'
#query: datatime Functions
queryparam=new_bind_params(1)
queryparam[0].timestamp(1626861392591112)
rows=self.stmtExe(conn,"select timediff('2021-07-21 17:56:32.590111',?,1a) from log",queryparam)
tdLog.debug("assert 7th case %s"%rows)
assert rows[0][0] == 1, '7th case is failed'
assert rows[1][0] == 1, '7th case is failed'
#query: aggregate Functions
querystmt4=conn.statement(" select count(?) from log ")
queryparam4=new_bind_params(1)
print(type(queryparam4))
queryparam4[0].int(123)
querystmt4.bind_param(queryparam4)
querystmt4.execute()
result4=querystmt4.use_result()
rows4=result4.fetch_all()
print("8",rows4)
assert rows4[0][0] == 3, ' 8 case is failed'
#query: selector Functions 9
querystmt4=conn.statement(" select bottom(bu,?) from log group by bu ; ")
queryparam4=new_bind_params(1)
print(type(queryparam4))
queryparam4[0].int(2)
querystmt4.bind_param(queryparam4)
querystmt4.execute()
result4=querystmt4.use_result()
rows4=result4.fetch_all()
print("9",rows4)
assert rows4[0][0] == 4, ' 9 case is failed'
assert rows4[1][0] == 3, ' 9 case is failed'
queryparam=new_bind_params(1)
queryparam[0].int(123)
rows=self.stmtExe(conn,"select count(?) from log ",queryparam)
tdLog.debug("assert 8th case %s"%rows)
assert rows[0][0] == 3, ' 8th case is failed'
# #query: selector Functions 9
# queryparam=new_bind_params(1)
# queryparam[0].int(2)
# rows=self.stmtExe(conn,"select bottom(bu,?) from log group by bu ; ",queryparam)
# tdLog.debug("assert 9th case %s"%rows)
# assert rows[0][0] == 4, ' 9 case is failed'
# assert rows[1][0] == 3, ' 9 case is failed'
# #query: time-series specific Functions 10
querystmt4=conn.statement(" select twa(?) from log; ")
queryparam4=new_bind_params(1)
print(type(queryparam4))
queryparam4[0].int(15)
querystmt4.bind_param(queryparam4)
querystmt4.execute()
result4=querystmt4.use_result()
rows4=result4.fetch_all()
print("10",rows4)
assert rows4[0][0] == 15, ' 10 case is failed'
querystmt=conn.statement(" select twa(?) from log; ")
queryparam=new_bind_params(1)
queryparam[0].int(15)
rows=self.stmtExe(conn," select twa(?) from log; ",queryparam)
tdLog.debug("assert 10th case %s"%rows)
assert rows[0][0] == 15, ' 10th case is failed'
# conn.execute("drop database if exists %s" % dbname)
......
......@@ -3,7 +3,7 @@ from util.sql import *
from util.cases import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
......@@ -33,7 +33,7 @@ class TDTestCase:
'insert into ntb values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())')
tdSql.execute(
'insert into stb_1 values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())')
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00') from ntb")
tdSql.checkRows(3)
tdSql.query("select timediff(1,0,1d) from ntb")
......@@ -72,12 +72,12 @@ class TDTestCase:
tdSql.query("select timediff(1,0,1a) from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1000)
tdSql.query("select timediff(1,0,1u) from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1000000)
tdSql.query("select timediff(1,0,1u) from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1000000)
tdSql.error("select timediff(1,0,1u) from ntb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,1000000)
tdSql.error("select timediff(1,0,1u) from db.ntb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,1000000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00') from stb")
tdSql.checkRows(3)
......@@ -116,12 +116,12 @@ class TDTestCase:
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1a) from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,86400000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1u) from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,86400000000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1u) from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,86400000000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1u) from stb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,86400000000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1u) from db.stb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,86400000000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00') from stb_1")
......@@ -164,12 +164,12 @@ class TDTestCase:
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1a) from db.stb_1")
tdSql.checkRows(3)
tdSql.checkData(0,0,43200000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1u) from stb_1")
tdSql.checkRows(3)
tdSql.checkData(0,0,43200000000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1u) from db.stb_1")
tdSql.checkRows(3)
tdSql.checkData(0,0,43200000000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1u) from stb_1")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,43200000000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1u) from db.stb_1")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,43200000000)
tdSql.query("select timediff('a','b') from stb")
tdSql.checkRows(3)
......@@ -202,4 +202,4 @@ class TDTestCase:
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
tdCases.addWindows(__file__, TDTestCase())
......@@ -18,7 +18,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def prepare_datas(self):
tdSql.execute(
'''create table stb1
......@@ -26,7 +26,7 @@ class TDTestCase:
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
......@@ -68,7 +68,7 @@ class TDTestCase:
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
'''
)
def test_errors(self):
error_sql_lists = [
# "select stateduration(c1,'GT',5,1s) from t1"
......@@ -110,35 +110,35 @@ class TDTestCase:
for error_sql in error_sql_lists:
tdSql.error(error_sql)
pass
def support_types(self):
other_no_value_types = [
"select stateduration(ts,'GT',1,1s) from t1" ,
"select stateduration(ts,'GT',1,1s) from t1" ,
"select stateduration(c7,'GT',1,1s) from t1",
"select stateduration(c8,'GT',1,1s) from t1",
"select stateduration(c9,'GT',1,1s) from t1",
"select stateduration(ts,'GT',1,1s) from ct1" ,
"select stateduration(ts,'GT',1,1s) from ct1" ,
"select stateduration(c7,'GT',1,1s) from ct1",
"select stateduration(c8,'GT',1,1s) from ct1",
"select stateduration(c9,'GT',1,1s) from ct1",
"select stateduration(ts,'GT',1,1s) from ct3" ,
"select stateduration(ts,'GT',1,1s) from ct3" ,
"select stateduration(c7,'GT',1,1s) from ct3",
"select stateduration(c8,'GT',1,1s) from ct3",
"select stateduration(c9,'GT',1,1s) from ct3",
"select stateduration(ts,'GT',1,1s) from ct4" ,
"select stateduration(ts,'GT',1,1s) from ct4" ,
"select stateduration(c7,'GT',1,1s) from ct4",
"select stateduration(c8,'GT',1,1s) from ct4",
"select stateduration(c9,'GT',1,1s) from ct4",
"select stateduration(ts,'GT',1,1s) from stb1 partition by tbname" ,
"select stateduration(ts,'GT',1,1s) from stb1 partition by tbname" ,
"select stateduration(c7,'GT',1,1s) from stb1 partition by tbname",
"select stateduration(c8,'GT',1,1s) from stb1 partition by tbname",
"select stateduration(c9,'GT',1,1s) from stb1 partition by tbname"
"select stateduration(c9,'GT',1,1s) from stb1 partition by tbname"
]
for type_sql in other_no_value_types:
tdSql.error(type_sql)
tdLog.info("support type ok , sql is : %s"%type_sql)
type_sql_lists = [
"select stateduration(c1,'GT',1,1s) from t1",
"select stateduration(c2,'GT',1,1s) from t1",
......@@ -168,8 +168,8 @@ class TDTestCase:
"select stateduration(c5,'GT',1,1s) from stb1 partition by tbname",
"select stateduration(c6,'GT',1,1s) from stb1 partition by tbname",
"select stateduration(c6,'GT',1,1s) as alisb from stb1 partition by tbname",
"select stateduration(c6,'GT',1,1s) alisb from stb1 partition by tbname",
"select stateduration(c6,'GT',1,1s) as alisb from stb1 partition by tbname",
"select stateduration(c6,'GT',1,1s) alisb from stb1 partition by tbname",
]
for type_sql in type_sql_lists:
......@@ -177,7 +177,7 @@ class TDTestCase:
def support_opers(self):
oper_lists = ['LT','lt','Lt','lT','GT','gt','Gt','gT','LE','le','Le','lE','GE','ge','Ge','gE','NE','ne','Ne','nE','EQ','eq','Eq','eQ']
oper_errors = [",","*","NULL","tbname","ts","sum","_c0"]
for oper in oper_lists:
......@@ -190,7 +190,7 @@ class TDTestCase:
def basic_stateduration_function(self):
# basic query
# basic query
tdSql.query("select c1 from ct3")
tdSql.checkRows(0)
tdSql.query("select c1 from t1")
......@@ -211,9 +211,9 @@ class TDTestCase:
tdSql.checkRows(0)
tdSql.query("select stateduration(c6,'GT',1,1s) from ct3")
# will support _rowts mix with
# will support _rowts mix with
# tdSql.query("select (c6,'GT',1,1s),_rowts from ct3")
# auto check for t1 table
# used for regular table
tdSql.query("select stateduration(c6,'GT',1,1s) from t1")
......@@ -229,17 +229,17 @@ class TDTestCase:
tdSql.error("select stateduration(c6,'GT',1,1s),tbname from ct1")
tdSql.error("select stateduration(c6,'GT',1,1s),t1 from ct1")
# unique with common col
# unique with common col
tdSql.error("select stateduration(c6,'GT',1,1s) ,ts from ct1")
tdSql.error("select stateduration(c6,'GT',1,1s) ,c1 from ct1")
# unique with scalar function
# unique with scalar function
tdSql.error("select stateduration(c6,'GT',1,1s) ,abs(c1) from ct1")
tdSql.error("select stateduration(c6,'GT',1,1s) , unique(c2) from ct1")
tdSql.error("select stateduration(c6,'GT',1,1s) , abs(c2)+2 from ct1")
# unique with aggregate function
# unique with aggregate function
tdSql.error("select stateduration(c6,'GT',1,1s) ,sum(c1) from ct1")
tdSql.error("select stateduration(c6,'GT',1,1s) ,max(c1) from ct1")
tdSql.error("select stateduration(c6,'GT',1,1s) ,csum(c1) from ct1")
......@@ -262,16 +262,16 @@ class TDTestCase:
tdSql.checkData(0, 0, 0)
tdSql.checkData(1, 0, 6134400)
tdSql.checkData(6, 0, -1)
# unique with union all
# unique with union all
tdSql.query("select stateduration(c1,'GT',1,1s) from ct4 union all select stateduration(c1,'GT',1,1s) from ct1")
tdSql.checkRows(25)
tdSql.query("select stateduration(c1,'GT',1,1s) from ct4 union all select distinct(c1) from ct4")
tdSql.checkRows(22)
# unique with join
# prepare join datas with same ts
# unique with join
# prepare join datas with same ts
tdSql.execute(" use db ")
tdSql.execute(" create stable st1 (ts timestamp , num int) tags(ind int)")
......@@ -328,7 +328,7 @@ class TDTestCase:
tdSql.checkRows(12)
tdSql.query("select stateduration(c1+2 ,'GT',1,1s) from t1")
tdSql.checkRows(12)
# bug for stable
#partition by tbname
......@@ -337,21 +337,20 @@ class TDTestCase:
# tdSql.query(" select unique(c1) from stb1 partition by tbname ")
# tdSql.checkRows(21)
# group by
# group by
tdSql.error("select stateduration(c1,'GT',1,1s) from ct1 group by c1")
tdSql.error("select stateduration(c1,'GT',1,1s) from ct1 group by tbname")
# super table
def check_unit_time(self):
tdSql.execute(" use db ")
tdSql.error("select stateduration(c1,'GT',1,1b) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1u) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1000s) from t1")
tdSql.query("select stateduration(c1,'GT',1,1s) from t1")
tdSql.checkData(10,0,63072035)
tdSql.query("select stateduration(c1,'GT',1,1000s) from t1")
tdSql.checkData(10,0,int(63072035/1000))
tdSql.query("select stateduration(c1,'GT',1,1m) from t1")
tdSql.checkData(10,0,int(63072035/60))
tdSql.query("select stateduration(c1,'GT',1,1h) from t1")
......@@ -360,8 +359,8 @@ class TDTestCase:
tdSql.checkData(10,0,int(63072035/60/24/60))
tdSql.query("select stateduration(c1,'GT',1,1w) from t1")
tdSql.checkData(10,0,int(63072035/60/7/24/60))
def check_boundary_values(self):
tdSql.execute("drop database if exists bound_test")
......
......@@ -675,7 +675,7 @@ class TDTestCase:
tdSql.checkRows(3)
tdSql.query("select TO_UNIXTIMESTAMP(datastr) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select TIMETRUNCATE(ts,1u) from jsons1 where jtag->'tag1'>1;")
tdSql.query("select TIMETRUNCATE(ts,1s) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select TIMEDIFF(ts,_c0) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
......
......@@ -18,7 +18,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def prepare_datas(self):
tdSql.execute(
'''create table stb1
......@@ -26,7 +26,7 @@ class TDTestCase:
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
......@@ -68,7 +68,7 @@ class TDTestCase:
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
'''
)
def test_errors(self):
error_sql_lists = [
# "select statecount(c1,'GT',5) from t1"
......@@ -110,35 +110,35 @@ class TDTestCase:
for error_sql in error_sql_lists:
tdSql.error(error_sql)
pass
def support_types(self):
other_no_value_types = [
"select statecount(ts,'GT',1) from t1" ,
"select statecount(ts,'GT',1) from t1" ,
"select statecount(c7,'GT',1) from t1",
"select statecount(c8,'GT',1) from t1",
"select statecount(c9,'GT',1) from t1",
"select statecount(ts,'GT',1) from ct1" ,
"select statecount(ts,'GT',1) from ct1" ,
"select statecount(c7,'GT',1) from ct1",
"select statecount(c8,'GT',1) from ct1",
"select statecount(c9,'GT',1) from ct1",
"select statecount(ts,'GT',1) from ct3" ,
"select statecount(ts,'GT',1) from ct3" ,
"select statecount(c7,'GT',1) from ct3",
"select statecount(c8,'GT',1) from ct3",
"select statecount(c9,'GT',1) from ct3",
"select statecount(ts,'GT',1) from ct4" ,
"select statecount(ts,'GT',1) from ct4" ,
"select statecount(c7,'GT',1) from ct4",
"select statecount(c8,'GT',1) from ct4",
"select statecount(c9,'GT',1) from ct4",
"select statecount(ts,'GT',1) from stb1 partition by tbname" ,
"select statecount(ts,'GT',1) from stb1 partition by tbname" ,
"select statecount(c7,'GT',1) from stb1 partition by tbname",
"select statecount(c8,'GT',1) from stb1 partition by tbname",
"select statecount(c9,'GT',1) from stb1 partition by tbname"
"select statecount(c9,'GT',1) from stb1 partition by tbname"
]
for type_sql in other_no_value_types:
tdSql.error(type_sql)
tdLog.info("support type ok , sql is : %s"%type_sql)
type_sql_lists = [
"select statecount(c1,'GT',1) from t1",
"select statecount(c2,'GT',1) from t1",
......@@ -168,8 +168,8 @@ class TDTestCase:
"select statecount(c5,'GT',1) from stb1 partition by tbname",
"select statecount(c6,'GT',1) from stb1 partition by tbname",
"select statecount(c6,'GT',1) as alisb from stb1 partition by tbname",
"select statecount(c6,'GT',1) alisb from stb1 partition by tbname",
"select statecount(c6,'GT',1) as alisb from stb1 partition by tbname",
"select statecount(c6,'GT',1) alisb from stb1 partition by tbname",
]
for type_sql in type_sql_lists:
......@@ -177,7 +177,7 @@ class TDTestCase:
def support_opers(self):
oper_lists = ['LT','lt','Lt','lT','GT','gt','Gt','gT','LE','le','Le','lE','GE','ge','Ge','gE','NE','ne','Ne','nE','EQ','eq','Eq','eQ']
oper_errors = [",","*","NULL","tbname","ts","sum","_c0"]
for oper in oper_lists:
......@@ -190,7 +190,7 @@ class TDTestCase:
def basic_statecount_function(self):
# basic query
# basic query
tdSql.query("select c1 from ct3")
tdSql.checkRows(0)
tdSql.query("select c1 from t1")
......@@ -211,9 +211,9 @@ class TDTestCase:
tdSql.checkRows(0)
tdSql.query("select statecount(c6,'GT',1) from ct3")
# will support _rowts mix with
# will support _rowts mix with
# tdSql.query("select (c6,'GT',1),_rowts from ct3")
# auto check for t1 table
# used for regular table
tdSql.query("select statecount(c6,'GT',1) from t1")
......@@ -229,17 +229,17 @@ class TDTestCase:
tdSql.error("select statecount(c6,'GT',1),tbname from ct1")
tdSql.error("select statecount(c6,'GT',1),t1 from ct1")
# unique with common col
# unique with common col
tdSql.error("select statecount(c6,'GT',1) ,ts from ct1")
tdSql.error("select statecount(c6,'GT',1) ,c1 from ct1")
# unique with scalar function
# unique with scalar function
tdSql.error("select statecount(c6,'GT',1) ,abs(c1) from ct1")
tdSql.error("select statecount(c6,'GT',1) , unique(c2) from ct1")
tdSql.error("select statecount(c6,'GT',1) , abs(c2)+2 from ct1")
# unique with aggregate function
# unique with aggregate function
tdSql.error("select statecount(c6,'GT',1) ,sum(c1) from ct1")
tdSql.error("select statecount(c6,'GT',1) ,max(c1) from ct1")
tdSql.error("select statecount(c6,'GT',1) ,csum(c1) from ct1")
......@@ -262,16 +262,16 @@ class TDTestCase:
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 2)
tdSql.checkData(6, 0, -1)
# unique with union all
# unique with union all
tdSql.query("select statecount(c1,'GT',1) from ct4 union all select statecount(c1,'GT',1) from ct1")
tdSql.checkRows(25)
tdSql.query("select statecount(c1,'GT',1) from ct4 union all select distinct(c1) from ct4")
tdSql.checkRows(22)
# unique with join
# prepare join datas with same ts
# unique with join
# prepare join datas with same ts
tdSql.execute(" use db ")
tdSql.execute(" create stable st1 (ts timestamp , num int) tags(ind int)")
......@@ -323,7 +323,7 @@ class TDTestCase:
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, 0.000000000)
tdSql.checkData(3, 0, -1.000000000)
# bug for stable
#partition by tbname
......@@ -332,21 +332,20 @@ class TDTestCase:
# tdSql.query(" select unique(c1) from stb1 partition by tbname ")
# tdSql.checkRows(21)
# group by
# group by
tdSql.query("select statecount(c1,'GT',1) from ct1 group by c1")
tdSql.error("select statecount(c1,'GT',1) from ct1 group by tbname")
# super table
def check_unit_time(self):
tdSql.execute(" use db ")
tdSql.error("select stateduration(c1,'GT',1,1b) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1u) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1000s) from t1")
tdSql.query("select stateduration(c1,'GT',1,1s) from t1")
tdSql.checkData(10,0,63072035)
tdSql.query("select stateduration(c1,'GT',1,1000s) from t1")
tdSql.checkData(10,0,int(63072035/1000))
tdSql.query("select stateduration(c1,'GT',1,1m) from t1")
tdSql.checkData(10,0,int(63072035/60))
tdSql.query("select stateduration(c1,'GT',1,1h) from t1")
......@@ -355,8 +354,8 @@ class TDTestCase:
tdSql.checkData(10,0,int(63072035/60/24/60))
tdSql.query("select stateduration(c1,'GT',1,1w) from t1")
tdSql.checkData(10,0,int(63072035/60/7/24/60))
def check_boundary_values(self):
tdSql.execute("drop database if exists bound_test")
......@@ -384,11 +383,11 @@ class TDTestCase:
tdSql.execute(
f"insert into sub1_bound values ( now(), -2147483643, -9223372036854775803, -32763, -123, -3.39E+38, -1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.error(
f"insert into sub1_bound values ( now()+1s, 2147483648, 9223372036854775808, 32768, 128, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.query("select statecount(c1,'GT',1) from sub1_bound")
tdSql.checkRows(5)
......@@ -396,29 +395,29 @@ class TDTestCase:
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table ==============")
self.prepare_datas()
tdLog.printNoPrefix("==========step2:test errors ==============")
tdLog.printNoPrefix("==========step2:test errors ==============")
self.test_errors()
tdLog.printNoPrefix("==========step3:support types ============")
tdLog.printNoPrefix("==========step3:support types ============")
self.support_types()
tdLog.printNoPrefix("==========step4:support opers ============")
tdLog.printNoPrefix("==========step4:support opers ============")
self.support_opers()
tdLog.printNoPrefix("==========step5: statecount basic query ============")
tdLog.printNoPrefix("==========step5: statecount basic query ============")
self.basic_statecount_function()
tdLog.printNoPrefix("==========step6: statecount boundary query ============")
tdLog.printNoPrefix("==========step6: statecount boundary query ============")
self.check_boundary_values()
tdLog.printNoPrefix("==========step6: statecount unit time test ============")
tdLog.printNoPrefix("==========step6: statecount unit time test ============")
self.check_unit_time()
......
......@@ -12,11 +12,11 @@ class TDTestCase:
self.rowNum = 10
self.ts = 1537146000000 # 2018-9-17 09:00:00.000
def run(self):
tdSql.prepare()
intData = []
intData = []
floatData = []
tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
......@@ -27,18 +27,18 @@ class TDTestCase:
for i in range(self.rowNum):
tdSql.execute("insert into ntb values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
intData.append(i + 1)
intData.append(i + 1)
floatData.append(i + 0.1)
for i in range(self.rowNum):
tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
intData.append(i + 1)
floatData.append(i + 0.1)
intData.append(i + 1)
floatData.append(i + 0.1)
tdSql.query("select timetruncate(1,1d) from ntb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1u) from ntb")
tdSql.checkRows(10)
tdSql.error("select timetruncate(1,1u) from ntb")
#tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1a) from ntb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1m) from ntb")
......@@ -97,8 +97,8 @@ class TDTestCase:
tdSql.query("select timetruncate(1,1d) from stb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1u) from stb")
tdSql.checkRows(10)
tdSql.error("select timetruncate(1,1u) from stb")
#tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1a) from stb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1m) from stb")
......@@ -156,8 +156,8 @@ class TDTestCase:
tdSql.query("select timetruncate(1,1d) from stb_1")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1u) from stb_1")
tdSql.checkRows(10)
tdSql.error("select timetruncate(1,1u) from stb_1")
#tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1a) from stb_1")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1m) from stb_1")
......@@ -217,4 +217,4 @@ class TDTestCase:
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
tdCases.addLinux(__file__, TDTestCase())
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdDnodes.stop(1)
tdDnodes.start(1)
return
def tmqCase3(self):
tdLog.printNoPrefix("======== test case 3: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 3
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 4
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0] + resultList[1]
if not (totalRowsInserted == actConsumeTotalRows):
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 3 end ...... ")
def tmqCase4(self):
tdLog.printNoPrefix("======== test case 4: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 5
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait commit notify")
tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("pkill consume processor")
tdCom.killProcessor("tmq_sim")
# time.sleep(10)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 6
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 4 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase3()
self.tmqCase4()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -18,7 +18,8 @@ python3 ./test.py -f 0-others/fsync.py
python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
# BUG python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
python3 ./test.py -f 1-insert/alter_stable.py
python3 ./test.py -f 1-insert/alter_table.py
python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
......@@ -89,7 +90,7 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py
# python3 ./test.py -f 2-query/nestedQuery_str.py
python3 ./test.py -f 2-query/avg.py
python3 ./test.py -f 2-query/elapsed.py
#python3 ./test.py -f 2-query/elapsed.py
python3 ./test.py -f 2-query/csum.py
python3 ./test.py -f 2-query/mavg.py
python3 ./test.py -f 2-query/diff.py
......
......@@ -63,8 +63,9 @@ if __name__ == "__main__":
mnodeNums = 0
updateCfgDict = {}
execCmd = ""
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums'])
queryPolicy = 1
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy'])
for key, value in opts:
if key in ['-h', '--help']:
tdLog.printNoPrefix(
......@@ -82,6 +83,7 @@ if __name__ == "__main__":
tdLog.printNoPrefix('-e eval str to run')
tdLog.printNoPrefix('-N create dnodes numbers in clusters')
tdLog.printNoPrefix('-M create mnode numbers in clusters')
tdLog.printNoPrefix('-Q set queryPolicy in one dnode')
sys.exit(0)
......@@ -138,6 +140,9 @@ if __name__ == "__main__":
if key in ['-M', '--mnodeNums']:
mnodeNums = value
if key in ['-Q', '--queryPolicy']:
queryPolicy = value
if not execCmd == "":
tdDnodes.init(deployPath)
print(execCmd)
......@@ -276,6 +281,22 @@ if __name__ == "__main__":
tdDnodes.deploy(1,updateCfgDict)
tdDnodes.start(1)
tdCases.logSql(logSql)
if queryPolicy != 1:
queryPolicy=int(queryPolicy)
conn = taos.connect(
host,
config=tdDnodes.getSimCfgPath())
tdSql.init(conn.cursor())
tdSql.execute("create qnode on dnode 1")
tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
tdSql.query("show local variables;")
for i in range(tdSql.queryRows):
if tdSql.queryResult[i][0] == "queryPolicy" :
if int(tdSql.queryResult[i][1]) == int(queryPolicy):
tdLog.success('alter queryPolicy to %d successfully'%queryPolicy)
else :
tdLog.debug(tdSql.queryResult)
tdLog.exit("alter queryPolicy to %d failed"%queryPolicy)
else :
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums))
dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums,mnodeNums=mnodeNums)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册