提交 1808fc79 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/hzcheng_3.0

...@@ -68,7 +68,7 @@ typedef struct { ...@@ -68,7 +68,7 @@ typedef struct {
typedef struct { typedef struct {
char* qmsg; char* qmsg;
qTaskInfo_t task[5]; qTaskInfo_t task;
} STqExecCol; } STqExecCol;
typedef struct { typedef struct {
...@@ -82,7 +82,7 @@ typedef struct { ...@@ -82,7 +82,7 @@ typedef struct {
typedef struct { typedef struct {
int8_t subType; int8_t subType;
STqReader* pExecReader[5]; STqReader* pExecReader;
union { union {
STqExecCol execCol; STqExecCol execCol;
STqExecTb execTb; STqExecTb execTb;
...@@ -139,8 +139,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa ...@@ -139,8 +139,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec // tqExec
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId); int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp);
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
// tqMeta // tqMeta
......
...@@ -146,7 +146,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int32_t colId); ...@@ -146,7 +146,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int32_t colId);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
......
...@@ -262,7 +262,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su ...@@ -262,7 +262,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; } static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; }
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq* pReq = pMsg->pCont; SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t timeout = pReq->timeout; int64_t timeout = pReq->timeout;
...@@ -271,9 +271,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -271,9 +271,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffsetVal reqOffset = pReq->reqOffset; STqOffsetVal reqOffset = pReq->reqOffset;
STqOffsetVal fetchOffsetNew; STqOffsetVal fetchOffsetNew;
// todo
workerId = 0;
// 1.find handle // 1.find handle
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/ /*ASSERT(pHandle);*/
...@@ -405,7 +402,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -405,7 +402,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp, workerId) < 0) { if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) {
/*ASSERT(0);*/ /*ASSERT(0);*/
} }
// TODO batch optimization: // TODO batch optimization:
...@@ -518,27 +515,23 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -518,27 +515,23 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->execHandle.execCol.qmsg = req.qmsg;
pHandle->snapshotVer = ver; pHandle->snapshotVer = ver;
req.qmsg = NULL; req.qmsg = NULL;
for (int32_t i = 0; i < 5; i++) { SReadHandle handle = {
SReadHandle handle = { .meta = pTq->pVnode->pMeta,
.meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode,
.vnode = pTq->pVnode, .initTableReader = true,
.initTableReader = true, .initTqReader = true,
.initTqReader = true, .version = ver,
.version = ver, };
}; pHandle->execHandle.execCol.task =
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, &pHandle->execHandle.pSchemaWrapper);
&pHandle->execHandle.pSchemaWrapper); ASSERT(pHandle->execHandle.execCol.task);
ASSERT(pHandle->execHandle.execCol.task[i]); void* scanner = NULL;
void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner);
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); ASSERT(scanner);
ASSERT(scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); ASSERT(pHandle->execHandle.pExecReader);
ASSERT(pHandle->execHandle.pExecReader[i]);
}
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
for (int32_t i = 0; i < 5; i++) { pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
}
pHandle->execHandle.execDb.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
...@@ -550,10 +543,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -550,10 +543,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
} }
for (int32_t i = 0; i < 5; i++) { pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode); tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
}
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
} }
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
......
...@@ -37,8 +37,8 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, ...@@ -37,8 +37,8 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp,
return 0; return 0;
} }
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataRsp* pRsp) { static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper);
if (pSW == NULL) { if (pSW == NULL) {
return -1; return -1;
} }
...@@ -61,7 +61,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { ...@@ -61,7 +61,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle; const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->execCol.task[0]; qTaskInfo_t task = pExec->execCol.task;
if (qStreamPrepareScan(task, pOffset) < 0) { if (qStreamPrepareScan(task, pOffset) < 0) {
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
...@@ -89,7 +89,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa ...@@ -89,7 +89,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
if (pRsp->withTbName) { if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader[0]->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue; continue;
} }
...@@ -184,12 +184,12 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S ...@@ -184,12 +184,12 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
} }
#endif #endif
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) { int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) {
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
pRsp->withSchema = 1; pRsp->withSchema = 1;
STqReader* pReader = pExec->pExecReader[workerId]; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
...@@ -197,18 +197,18 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR ...@@ -197,18 +197,18 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue; continue;
} }
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
pRsp->withSchema = 1; pRsp->withSchema = 1;
STqReader* pReader = pExec->pExecReader[workerId]; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
...@@ -216,13 +216,13 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR ...@@ -216,13 +216,13 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue; continue;
} }
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }
} }
......
...@@ -80,28 +80,23 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -80,28 +80,23 @@ int32_t tqMetaOpen(STQ* pTq) {
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle); tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
/*for (int32_t i = 0; i < 5; i++) {*/
/*handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) { SReadHandle reader = {
SReadHandle reader = { .meta = pTq->pVnode->pMeta,
.meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode,
.vnode = pTq->pVnode, .initTableReader = true,
.initTableReader = true, .initTqReader = true,
.initTqReader = true, .version = handle.snapshotVer,
.version = handle.snapshotVer, };
};
handle.execHandle.execCol.task =
handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
&handle.execHandle.pSchemaWrapper); ASSERT(handle.execHandle.execCol.task);
ASSERT(handle.execHandle.execCol.task[i]); void* scanner = NULL;
void* scanner = NULL; qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner); ASSERT(scanner);
ASSERT(scanner); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
handle.execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); ASSERT(handle.execHandle.pExecReader);
ASSERT(handle.execHandle.pExecReader[i]);
}
} else { } else {
handle.execHandle.execDb.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
......
...@@ -394,10 +394,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -394,10 +394,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
if (pIter == NULL) break; if (pIter == NULL) break;
STqHandle* pExec = (STqHandle*)pIter; STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) { int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task, tbUidList, isAdd);
int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task[i], tbUidList, isAdd); ASSERT(code == 0);
ASSERT(code == 0);
}
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
if (!isAdd) { if (!isAdd) {
int32_t sz = taosArrayGetSize(tbUidList); int32_t sz = taosArrayGetSize(tbUidList);
......
...@@ -127,6 +127,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -127,6 +127,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
tqDebug("tq sink, convert block %d, rows: %d", i, rows);
int32_t dataLen = 0; int32_t dataLen = 0;
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
...@@ -178,11 +180,14 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -178,11 +180,14 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data; const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode; SVnode* pVnode = (SVnode*)vnode;
tqDebug("task write into table, vgId %d, block num: %d", pVnode->config.vgId, (int32_t)pRes->size); tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId); pTask->tbSink.stbFullName, pVnode->config.vgId);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/ /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
// build write msg // build write msg
SRpcMsg msg = { SRpcMsg msg = {
......
...@@ -316,7 +316,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -316,7 +316,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_VND_TABLE_CFG: case TDMT_VND_TABLE_CFG:
return vnodeGetTableCfg(pVnode, pMsg); return vnodeGetTableCfg(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg); return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH: case TDMT_STREAM_TASK_DISPATCH:
......
...@@ -229,8 +229,8 @@ typedef struct { ...@@ -229,8 +229,8 @@ typedef struct {
int8_t stop; int8_t stop;
} SAsyncPool; } SAsyncPool;
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool); void transAsyncPoolDestroy(SAsyncPool* pool);
int transAsyncSend(SAsyncPool* pool, queue* mq); int transAsyncSend(SAsyncPool* pool, queue* mq);
bool transAsyncPoolIsEmpty(SAsyncPool* pool); bool transAsyncPoolIsEmpty(SAsyncPool* pool);
...@@ -322,7 +322,7 @@ typedef struct STransReq { ...@@ -322,7 +322,7 @@ typedef struct STransReq {
} STransReq; } STransReq;
void transReqQueueInit(queue* q); void transReqQueueInit(queue* q);
void* transReqQueuePushReq(queue* q); void* transReqQueuePush(queue* q);
void* transReqQueueRemove(void* arg); void* transReqQueueRemove(void* arg);
void transReqQueueClear(queue* q); void transReqQueueClear(queue* q);
...@@ -393,9 +393,9 @@ typedef struct SDelayQueue { ...@@ -393,9 +393,9 @@ typedef struct SDelayQueue {
uv_loop_t* loop; uv_loop_t* loop;
} SDelayQueue; } SDelayQueue;
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
bool transEpSetIsEqual(SEpSet* a, SEpSet* b); bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
/* /*
......
...@@ -26,7 +26,7 @@ typedef struct SCliConn { ...@@ -26,7 +26,7 @@ typedef struct SCliConn {
SConnBuffer readBuf; SConnBuffer readBuf;
STransQueue cliMsgs; STransQueue cliMsgs;
queue conn; queue q;
uint64_t expireTime; uint64_t expireTime;
STransCtx ctx; STransCtx ctx;
...@@ -451,7 +451,7 @@ void cliTimeoutCb(uv_timer_t* handle) { ...@@ -451,7 +451,7 @@ void cliTimeoutCb(uv_timer_t* handle) {
while (p != NULL) { while (p != NULL) {
while (!QUEUE_IS_EMPTY(&p->conn)) { while (!QUEUE_IS_EMPTY(&p->conn)) {
queue* h = QUEUE_HEAD(&p->conn); queue* h = QUEUE_HEAD(&p->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn); SCliConn* c = QUEUE_DATA(h, SCliConn, q);
if (c->expireTime < currentTime) { if (c->expireTime < currentTime) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
transUnrefCliHandle(c); transUnrefCliHandle(c);
...@@ -475,7 +475,7 @@ void* destroyConnPool(void* pool) { ...@@ -475,7 +475,7 @@ void* destroyConnPool(void* pool) {
while (connList != NULL) { while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conn)) { while (!QUEUE_IS_EMPTY(&connList->conn)) {
queue* h = QUEUE_HEAD(&connList->conn); queue* h = QUEUE_HEAD(&connList->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn); SCliConn* c = QUEUE_DATA(h, SCliConn, q);
cliDestroyConn(c, true); cliDestroyConn(c, true);
} }
connList = taosHashIterate((SHashObj*)pool, connList); connList = taosHashIterate((SHashObj*)pool, connList);
...@@ -501,11 +501,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -501,11 +501,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
return NULL; return NULL;
} }
queue* h = QUEUE_HEAD(&plist->conn); queue* h = QUEUE_HEAD(&plist->conn);
SCliConn* conn = QUEUE_DATA(h, SCliConn, conn); SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal; conn->status = ConnNormal;
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->q);
assert(h == &conn->conn); assert(h == &conn->q);
return conn; return conn;
} }
static int32_t allocConnRef(SCliConn* conn, bool update) { static int32_t allocConnRef(SCliConn* conn, bool update) {
...@@ -560,8 +560,8 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -560,8 +560,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->q);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->q);
assert(!QUEUE_IS_EMPTY(&plist->conn)); assert(!QUEUE_IS_EMPTY(&plist->conn));
} }
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
...@@ -614,7 +614,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -614,7 +614,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
transReqQueueInit(&conn->wreqQueue); transReqQueueInit(&conn->wreqQueue);
transQueueInit(&conn->cliMsgs, NULL); transQueueInit(&conn->cliMsgs, NULL);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
conn->status = ConnNormal; conn->status = ConnNormal;
conn->broken = 0; conn->broken = 0;
...@@ -626,8 +626,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -626,8 +626,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
} }
static void cliDestroyConn(SCliConn* conn, bool clear) { static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->q);
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1; conn->refId = -1;
...@@ -735,7 +735,7 @@ void cliSend(SCliConn* pConn) { ...@@ -735,7 +735,7 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
} }
uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return; return;
_RETURN: _RETURN:
...@@ -990,7 +990,7 @@ static SCliThrd* createThrdObj() { ...@@ -990,7 +990,7 @@ static SCliThrd* createThrdObj() {
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop); uv_loop_init(pThrd->loop);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb);
uv_timer_init(pThrd->loop, &pThrd->timer); uv_timer_init(pThrd->loop, &pThrd->timer);
pThrd->timer.data = pThrd; pThrd->timer.data = pThrd;
...@@ -1009,7 +1009,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { ...@@ -1009,7 +1009,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
CLI_RELEASE_UV(pThrd->loop); CLI_RELEASE_UV(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx); taosThreadMutexDestroy(&pThrd->msgMtx);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
transDestroyAsyncPool(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
transDQDestroy(pThrd->delayQueue, destroyCmsg); transDQDestroy(pThrd->delayQueue, destroyCmsg);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
...@@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) { ...@@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) {
cliHandleReq(pMsg, pThrd); cliHandleReq(pMsg, pThrd);
} }
static void doCloseIdleConn(void* param) {
STaskArg* arg = param;
SCliConn* conn = arg->param1;
SCliThrd* pThrd = arg->param2;
}
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
...@@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { ...@@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
} }
} }
bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) { bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
if ((pResp == NULL || pResp->info.hasEpSet == 0)) { if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
return false; return false;
} }
...@@ -1116,7 +1122,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1116,7 +1122,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
*/ */
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code; int32_t code = pResp->code;
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
if (retry) { if (retry) {
pMsg->sent = 0; pMsg->sent = 0;
pCtx->retryCnt += 1; pCtx->retryCnt += 1;
...@@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pCtx->retryCnt < pCtx->retryLimit) { if (pCtx->retryCnt < pCtx->retryLimit) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
EPSET_FORWARD_INUSE(&pCtx->epSet); EPSET_FORWARD_INUSE(&pCtx->epSet);
transFreeMsg(pResp->pCont);
cliSchedMsgToNextNode(pMsg, pThrd); cliSchedMsgToNextNode(pMsg, pThrd);
return -1; return -1;
} }
...@@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STraceId* trace = &pResp->info.traceId; STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryToExtractEpSet(pResp, &pCtx->epSet); bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
if (hasEpSet) { if (hasEpSet) {
char tbuf[256] = {0}; char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf); EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
...@@ -1336,19 +1344,18 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs ...@@ -1336,19 +1344,18 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
if (0 != transAsyncSend(pThrd->asyncPool, &cliMsg->q)) { int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
tsem_destroy(sem); if (ret != 0) {
taosMemoryFree(sem);
destroyCmsg(cliMsg); destroyCmsg(cliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); goto _RETURN;
return -1;
} }
tsem_wait(sem); tsem_wait(sem);
_RETURN:
tsem_destroy(sem); tsem_destroy(sem);
taosMemoryFree(sem); taosMemoryFree(sem);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0; return ret;
} }
/* /*
* *
......
...@@ -175,7 +175,7 @@ int transSetConnOption(uv_tcp_t* stream) { ...@@ -175,7 +175,7 @@ int transSetConnOption(uv_tcp_t* stream) {
return ret; return ret;
} }
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
pool->nAsync = sz; pool->nAsync = sz;
pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
...@@ -194,7 +194,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) ...@@ -194,7 +194,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
return pool; return pool;
} }
void transDestroyAsyncPool(SAsyncPool* pool) { void transAsyncPoolDestroy(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) { for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]); uv_async_t* async = &(pool->asyncs[i]);
// uv_close((uv_handle_t*)async, NULL); // uv_close((uv_handle_t*)async, NULL);
...@@ -205,6 +205,14 @@ void transDestroyAsyncPool(SAsyncPool* pool) { ...@@ -205,6 +205,14 @@ void transDestroyAsyncPool(SAsyncPool* pool) {
taosMemoryFree(pool->asyncs); taosMemoryFree(pool->asyncs);
taosMemoryFree(pool); taosMemoryFree(pool);
} }
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
}
return true;
}
int transAsyncSend(SAsyncPool* pool, queue* q) { int transAsyncSend(SAsyncPool* pool, queue* q) {
if (atomic_load_8(&pool->stop) == 1) { if (atomic_load_8(&pool->stop) == 1) {
return -1; return -1;
...@@ -228,14 +236,6 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { ...@@ -228,14 +236,6 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
} }
return uv_async_send(async); return uv_async_send(async);
} }
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
}
return true;
}
void transCtxInit(STransCtx* ctx) { void transCtxInit(STransCtx* ctx) {
// init transCtx // init transCtx
...@@ -308,7 +308,7 @@ void transReqQueueInit(queue* q) { ...@@ -308,7 +308,7 @@ void transReqQueueInit(queue* q) {
// init req queue // init req queue
QUEUE_INIT(q); QUEUE_INIT(q);
} }
void* transReqQueuePushReq(queue* q) { void* transReqQueuePush(queue* q) {
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq)); STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq));
wreq->data = req; wreq->data = req;
...@@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { ...@@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
heapDestroy(queue->heap); heapDestroy(queue->heap);
taosMemoryFree(queue); taosMemoryFree(queue);
} }
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
uv_timer_stop(queue->timer);
if (heapSize(queue->heap) <= 0) return;
heapRemove(queue->heap, &task->node);
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { if (heapSize(queue->heap) != 0) {
HeapNode* minNode = heapMin(queue->heap);
if (minNode != NULL) return;
uint64_t now = taosGetTimestampMs();
SDelayTask* task = container_of(minNode, SDelayTask, node);
uint64_t timeout = now > task->execTime ? now - task->execTime : 0;
uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
}
}
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
uint64_t now = taosGetTimestampMs(); uint64_t now = taosGetTimestampMs();
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
task->func = func; task->func = func;
...@@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_ ...@@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_
tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs); tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
heapInsert(queue->heap, &task->node); heapInsert(queue->heap, &task->node);
uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0); uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
return 0; return task;
} }
void transPrintEpSet(SEpSet* pEpSet) { void transPrintEpSet(SEpSet* pEpSet) {
......
...@@ -434,7 +434,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) { ...@@ -434,7 +434,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
uvPrepareSendData(smsg, &wb); uvPrepareSendData(smsg, &wb);
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
} }
static void uvStartSendResp(SSvrMsg* smsg) { static void uvStartSendResp(SSvrMsg* smsg) {
...@@ -697,7 +697,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { ...@@ -697,7 +697,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
// conn set // conn set
QUEUE_INIT(&pThrd->conn); QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 1, pThrd, uvWorkerAsyncCb); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 1, pThrd, uvWorkerAsyncCb);
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true; return true;
...@@ -976,7 +976,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { ...@@ -976,7 +976,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
taosThreadJoin(pThrd->thread, NULL); taosThreadJoin(pThrd->thread, NULL);
SRV_RELEASE_UV(pThrd->loop); SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
transDestroyAsyncPool(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
......
...@@ -809,6 +809,8 @@ class StateEmpty(AnyState): ...@@ -809,6 +809,8 @@ class StateEmpty(AnyState):
] ]
def verifyTasksToState(self, tasks, newState): def verifyTasksToState(self, tasks, newState):
if Config.getConfig().ignore_errors: # if we are asked to ignore certain errors, let's not verify CreateDB success.
return
if (self.hasSuccess(tasks, TaskCreateDb) if (self.hasSuccess(tasks, TaskCreateDb)
): # at EMPTY, if there's succes in creating DB ): # at EMPTY, if there's succes in creating DB
if (not self.hasTask(tasks, TaskDropDb)): # and no drop_db tasks if (not self.hasTask(tasks, TaskDropDb)): # and no drop_db tasks
...@@ -2491,7 +2493,7 @@ class MainExec: ...@@ -2491,7 +2493,7 @@ class MainExec:
action='store', action='store',
default=None, default=None,
type=str, type=str,
help='Ignore error codes, comma separated, 0x supported (default: None)') help='Ignore error codes, comma separated, 0x supported, also suppresses certain transition state checks. (default: None)')
parser.add_argument( parser.add_argument(
'-i', '-i',
'--num-replicas', '--num-replicas',
......
...@@ -28,6 +28,7 @@ from util.common import * ...@@ -28,6 +28,7 @@ from util.common import *
from util.constant import * from util.constant import *
from dataclasses import dataclass,field from dataclasses import dataclass,field
from typing import List from typing import List
from datetime import datetime
@dataclass @dataclass
class DataSet: class DataSet:
......
...@@ -89,7 +89,7 @@ ...@@ -89,7 +89,7 @@
./test.sh -f tsim/parser/alter.sim ./test.sh -f tsim/parser/alter.sim
# jira ./test.sh -f tsim/parser/alter1.sim # jira ./test.sh -f tsim/parser/alter1.sim
./test.sh -f tsim/parser/auto_create_tb_drop_tb.sim ./test.sh -f tsim/parser/auto_create_tb_drop_tb.sim
# jira ./test.sh -f tsim/parser/auto_create_tb.sim ./test.sh -f tsim/parser/auto_create_tb.sim
./test.sh -f tsim/parser/between_and.sim ./test.sh -f tsim/parser/between_and.sim
./test.sh -f tsim/parser/binary_escapeCharacter.sim ./test.sh -f tsim/parser/binary_escapeCharacter.sim
# jira ./test.sh -f tsim/parser/col_arithmetic_operation.sim # jira ./test.sh -f tsim/parser/col_arithmetic_operation.sim
...@@ -121,25 +121,21 @@ ...@@ -121,25 +121,21 @@
./test.sh -f tsim/parser/insert_multiTbl.sim ./test.sh -f tsim/parser/insert_multiTbl.sim
./test.sh -f tsim/parser/insert_tb.sim ./test.sh -f tsim/parser/insert_tb.sim
# jira ./test.sh -f tsim/parser/interp.sim # jira ./test.sh -f tsim/parser/interp.sim
# ./test.sh -f tsim/parser/join.sim ./test.sh -f tsim/parser/join_manyblocks.sim
# ./test.sh -f tsim/parser/join_manyblocks.sim
## ./test.sh -f tsim/parser/join_multitables.sim ## ./test.sh -f tsim/parser/join_multitables.sim
# ./test.sh -f tsim/parser/join_multivnode.sim # ./test.sh -f tsim/parser/join_multivnode.sim
# jira ./test.sh -f tsim/parser/join.sim
./test.sh -f tsim/parser/last_cache.sim ./test.sh -f tsim/parser/last_cache.sim
## ./test.sh -f tsim/parser/last_groupby.sim ./test.sh -f tsim/parser/last_groupby.sim
# jira ./test.sh -f tsim/parser/lastrow.sim # jira ./test.sh -f tsim/parser/lastrow.sim
## ./test.sh -f tsim/parser/like.sim ./test.sh -f tsim/parser/like.sim
# ./test.sh -f tsim/parser/limit.sim # jira ./test.sh -f tsim/parser/limit.sim
# ./test.sh -f tsim/parser/limit1.sim # jira ./test.sh -f tsim/parser/limit1.sim
# ./test.sh -f tsim/parser/limit1_tblocks100.sim # jira ./test.sh -f tsim/parser/limit2.sim
## ./test.sh -f tsim/parser/limit2.sim # jira ./test.sh -f tsim/parser/line_insert.sim
## ./test.sh -f tsim/parser/limit2_tblocks100.sim
## ./test.sh -f tsim/parser/limit_stb.sim
## ./test.sh -f tsim/parser/limit_tb.sim
## ./test.sh -f tsim/parser/line_insert.sim
./test.sh -f tsim/parser/mixed_blocks.sim ./test.sh -f tsim/parser/mixed_blocks.sim
./test.sh -f tsim/parser/nchar.sim ./test.sh -f tsim/parser/nchar.sim
# ./test.sh -f tsim/parser/nestquery.sim # jira ./test.sh -f tsim/parser/nestquery.sim
# jira ./test.sh -f tsim/parser/null_char.sim # jira ./test.sh -f tsim/parser/null_char.sim
./test.sh -f tsim/parser/precision_ns.sim ./test.sh -f tsim/parser/precision_ns.sim
./test.sh -f tsim/parser/projection_limit_offset.sim ./test.sh -f tsim/parser/projection_limit_offset.sim
...@@ -165,7 +161,8 @@ ...@@ -165,7 +161,8 @@
# jira ./test.sh -f tsim/parser/udf_dll_stable.sim # jira ./test.sh -f tsim/parser/udf_dll_stable.sim
# jira ./test.sh -f tsim/parser/udf_dll.sim # jira ./test.sh -f tsim/parser/udf_dll.sim
# jira ./test.sh -f tsim/parser/udf.sim # jira ./test.sh -f tsim/parser/udf.sim
# ./test.sh -f tsim/parser/union.sim ./test.sh -f tsim/parser/union.sim
# jira ./test.sh -f tsim/parser/union_sysinfo.sim
# jira ./test.sh -f tsim/parser/where.sim # jira ./test.sh -f tsim/parser/where.sim
# ---- query # ---- query
......
...@@ -84,6 +84,7 @@ echo "SIM_DIR : $SIM_DIR" ...@@ -84,6 +84,7 @@ echo "SIM_DIR : $SIM_DIR"
echo "CODE_DIR : $CODE_DIR" echo "CODE_DIR : $CODE_DIR"
echo "CFG_DIR : $CFG_DIR" echo "CFG_DIR : $CFG_DIR"
rm -rf $SIM_DIR/*
rm -rf $LOG_DIR rm -rf $LOG_DIR
rm -rf $CFG_DIR rm -rf $CFG_DIR
......
...@@ -282,7 +282,7 @@ if $rows != 2 then ...@@ -282,7 +282,7 @@ if $rows != 2 then
return -1 return -1
endi endi
sql insert into tick_000001 ('ts', 'last_prc', 'volume', 'amount', 'oi', 'bid_prc1', 'ask_prc1') using tick tags (000001, Stocks) VALUES (1546391700000, 0.000000, 0, 0.000000, 0, 0.000000, 10.320000); sql insert into tick_000001 (ts, last_prc, volume, amount, oi, bid_prc1, ask_prc1) using tick tags ('000001', 'Stocks') VALUES (1546391700000, 0.000000, 0, 0.000000, 0, 0.000000, 10.320000);
sql select tbname from tick sql select tbname from tick
if $rows != 1 then if $rows != 1 then
return -1 return -1
......
...@@ -233,8 +233,15 @@ endi ...@@ -233,8 +233,15 @@ endi
print 1 print 1
#select + where condition + interval query #select + where condition + interval query
sql select count(join_tb1.*) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts >= 100000 and join_tb0.c7 = true interval(10a) order by join_tb0.ts desc; print select count(join_tb1.*) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts >= 100000 and join_tb0.c7 = true interval(10a) order by _wstart asc;
sql select count(join_tb1.*) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts >= 100000 and join_tb0.c7 = true interval(10a) order by _wstart asc;
$val = 100
if $rows != $val then
return -1
endi
print select count(join_tb1.*) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts >= 100000 and join_tb0.c7 = true interval(10a) order by _wstart desc;
sql select count(join_tb1.*) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts >= 100000 and join_tb0.c7 = true interval(10a) order by _wstart desc;
$val = 100 $val = 100
if $rows != $val then if $rows != $val then
return -1 return -1
......
...@@ -73,8 +73,6 @@ while $i < $tbNum ...@@ -73,8 +73,6 @@ while $i < $tbNum
$tstart = 100000 $tstart = 100000
endw endw
sleep 100
print ===============join_manyblocks.sim print ===============join_manyblocks.sim
print ==============> td-3313 print ==============> td-3313
sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1; sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1;
......
...@@ -4,14 +4,11 @@ system sh/exec.sh -n dnode1 -s start ...@@ -4,14 +4,11 @@ system sh/exec.sh -n dnode1 -s start
sql connect sql connect
print ======================== dnode1 start print ======================== dnode1 start
$db = testdb $db = testdb
sql create database $db sql create database $db
sql use $db sql use $db
sql create stable st2 (ts timestamp, f1 int, f2 float, f3 double, f4 bigint, f5 smallint, f6 tinyint, f7 bool, f8 binary(10), f9 nchar(10)) tags (id1 int, id2 float, id3 nchar(10), id4 double, id5 smallint, id6 bigint, id7 binary(10)) sql create stable st2 (ts timestamp, f1 int, f2 float, f3 double, f4 bigint, f5 smallint, f6 tinyint, f7 bool, f8 binary(10), f9 nchar(10)) tags (id1 int, id2 float, id3 nchar(10), id4 double, id5 smallint, id6 bigint, id7 binary(10))
sql create table tb1 using st2 tags (1,1.0,"1",1.0,1,1,"1"); sql create table tb1 using st2 tags (1,1.0,"1",1.0,1,1,"1");
sql insert into tb1 values (now-200s,1,1.0,1.0,1,1,1,true,"1","1") sql insert into tb1 values (now-200s,1,1.0,1.0,1,1,1,true,"1","1")
...@@ -23,16 +20,13 @@ sql insert into tb1 values (now+300s,4,4.0,4.0,4,4,4,true,"4","4") ...@@ -23,16 +20,13 @@ sql insert into tb1 values (now+300s,4,4.0,4.0,4,4,4,true,"4","4")
sql insert into tb1 values (now+400s,4,4.0,4.0,4,4,4,true,"4","4") sql insert into tb1 values (now+400s,4,4.0,4.0,4,4,4,true,"4","4")
sql insert into tb1 values (now+500s,4,4.0,4.0,4,4,4,true,"4","4") sql insert into tb1 values (now+500s,4,4.0,4.0,4,4,4,true,"4","4")
sql select f1,last(*) from st2 group by f1; sql select f1, last(*) from st2 group by f1 order by f1;
if $rows != 4 then if $rows != 4 then
return -1 return -1
endi endi
if $data00 != 1 then if $data00 != 1 then
return -1 return -1
endi endi
if $data02 != 1 then if $data02 != 1 then
print $data02 print $data02
return -1 return -1
...@@ -59,15 +53,13 @@ if $data09 != 1 then ...@@ -59,15 +53,13 @@ if $data09 != 1 then
return -1 return -1
endi endi
sql select f1,last(f1,st2.*) from st2 group by f1; sql select f1, last(f1,st2.*) from st2 group by f1 order by f1;
if $rows != 4 then if $rows != 4 then
return -1 return -1
endi endi
if $data00 != 1 then if $data00 != 1 then
return -1 return -1
endi endi
if $data01 != 1 then if $data01 != 1 then
return -1 return -1
endi endi
......
...@@ -5,7 +5,6 @@ sql connect ...@@ -5,7 +5,6 @@ sql connect
print ======================== dnode1 start print ======================== dnode1 start
$db = testdb $db = testdb
sql drop database if exists $db sql drop database if exists $db
sql create database $db cachemodel 'last_value' sql create database $db cachemodel 'last_value'
...@@ -32,7 +31,6 @@ if $rows != 2 then ...@@ -32,7 +31,6 @@ if $rows != 2 then
return -1 return -1
endi endi
sql select b from $table1 where b like 'table\_name' sql select b from $table1 where b like 'table\_name'
if $rows != 1 then if $rows != 1 then
return -1 return -1
......
...@@ -18,7 +18,7 @@ $stb = $stbPrefix . $i ...@@ -18,7 +18,7 @@ $stb = $stbPrefix . $i
sql drop database $db -x step1 sql drop database $db -x step1
step1: step1:
sql create database $db sql create database $db cache 16
print ====== create tables print ====== create tables
sql use $db sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int) sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
...@@ -57,11 +57,10 @@ run tsim/parser/limit1_stb.sim ...@@ -57,11 +57,10 @@ run tsim/parser/limit1_stb.sim
print ================== restart server to commit data into disk print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 500
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
print ================== server restart completed print ================== server restart completed
run tsim/parser/limit1_tb.sim run tsim/parser/limit1_tb.sim
run tsim/parser/limit1_stb.sim run tsim/parser/limit1_stb.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
sleep 100
sql connect sql connect
$dbPrefix = lm1_db $dbPrefix = lm1_db
......
sleep 100
sql connect sql connect
$dbPrefix = lm1_db $dbPrefix = lm1_db
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
$dbPrefix = lm1_db
$tbPrefix = lm1_tb
$stbPrefix = lm1_stb
$tbNum = 10
$rowNum = 10000
$totalNum = $tbNum * $rowNum
$ts0 = 1537146000000
$delta = 600000
print ========== limit1.sim
$i = 0
$db = $dbPrefix . $i
$stb = $stbPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db cache 16
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
$i = 0
$ts = $ts0
$halfNum = $tbNum / 2
while $i < $halfNum
$tbId = $i + $halfNum
$tb = $tbPrefix . $i
$tb1 = $tbPrefix . $tbId
sql create table $tb using $stb tags( $i )
sql create table $tb1 using $stb tags( $tbId )
$x = 0
while $x < $rowNum
$xs = $x * $delta
$ts = $ts0 + $xs
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar ) $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar )
$x = $x + 1
endw
$i = $i + 1
endw
print ====== tables created
run tsim/parser/limit1_tb.sim
run tsim/parser/limit1_stb.sim
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 500
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
run tsim/parser/limit1_tb.sim
run tsim/parser/limit1_stb.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c rowsInFileBlock -v 255
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect sql connect
$dbPrefix = lm2_db $dbPrefix = lm2_db
...@@ -69,10 +65,8 @@ print ====== tables created ...@@ -69,10 +65,8 @@ print ====== tables created
print ================== restart server to commit data into disk print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 500 sleep 100
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
print ================== server restart completed print ================== server restart completed
run tsim/parser/limit2_query.sim run tsim/parser/limit2_query.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
sleep 100
sql connect sql connect
$dbPrefix = lm2_db $dbPrefix = lm2_db
...@@ -24,8 +23,11 @@ sql use $db ...@@ -24,8 +23,11 @@ sql use $db
##### aggregation on stb with 6 tags + where + group by + limit offset ##### aggregation on stb with 6 tags + where + group by + limit offset
$val1 = 1 $val1 = 1
$val2 = $tbNum - 1 $val2 = $tbNum - 1
sql select count(*) from $stb where t1 > $val1 and t1 < $val2 group by t1, t2, t3, t4, t5, t6 order by t1 asc limit 1 offset 0 print select count(*) from $stb where t1 > $val1 and t1 < $val2 group by t1, t2, t3, t4, t5, t6 order by t1 asc limit 1 offset 0
sql select count(*), t1, t2, t3, t4, t5, t6 from $stb where t1 > $val1 and t1 < $val2 group by t1, t2, t3, t4, t5, t6 order by t1 asc limit 1 offset 0
$val = $tbNum - 3 $val = $tbNum - 3
print $rows $val
if $rows != $val then if $rows != $val then
return -1 return -1
endi endi
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c rowsInFileBlock -v 255
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
$dbPrefix = lm2_db
$tbPrefix = lm2_tb
$stbPrefix = lm2_stb
$tbNum = 10
$rowNum = 10000
$totalNum = $tbNum * $rowNum
$ts0 = 1537146000000
$delta = 600000
$tsu = $rowNum * $delta
$tsu = $tsu - $delta
$tsu = $tsu + $ts0
print ========== limit2.sim
$i = 0
$db = $dbPrefix . $i
$stb = $stbPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db tblocks 100
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int, t2 nchar(20), t3 binary(20), t4 bigint, t5 smallint, t6 double)
$i = 0
$ts = $ts0
$halfNum = $tbNum / 2
while $i < $halfNum
$i1 = $i + $halfNum
$tb = $tbPrefix . $i
$tb1 = $tbPrefix . $i1
$tgstr = 'tb . $i
$tgstr = $tgstr . '
$tgstr1 = 'tb . $i1
$tgstr1 = $tgstr1 . '
sql create table $tb using $stb tags( $i , $tgstr , $tgstr , $i , $i , $i )
sql create table $tb1 using $stb tags( $i1 , $tgstr1 , $tgstr1 , $i , $i , $i )
$x = 0
while $x < $rowNum
$xs = $x * $delta
$ts = $ts0 + $xs
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar )
sql insert into $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar )
$x = $x + 1
endw
$i = $i + 1
endw
print ====== tables created
#run tsim/parser/limit2_query.sim
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 100
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
run tsim/parser/limit2_query.sim
sleep 100
sql connect sql connect
$dbPrefix = lm_db $dbPrefix = lm_db
......
sleep 100
sql connect sql connect
$dbPrefix = lm_db $dbPrefix = lm_db
......
...@@ -43,7 +43,7 @@ endi ...@@ -43,7 +43,7 @@ endi
#print =============== clear #print =============== clear
sql drop database $db sql drop database $db
sql show databases sql show databases
if $rows != 0 then if $rows != 2 then
return -1 return -1
endi endi
......
...@@ -53,8 +53,6 @@ while $i < $half ...@@ -53,8 +53,6 @@ while $i < $half
$i = $i + 1 $i = $i + 1
endw endw
sleep 100
$i = 1 $i = 1
$tb = $tbPrefix . $i $tb = $tbPrefix . $i
...@@ -63,7 +61,6 @@ sql select count(*) from (select count(*) from nest_mt0) ...@@ -63,7 +61,6 @@ sql select count(*) from (select count(*) from nest_mt0)
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 1 then if $data00 != 1 then
return -1 return -1
endi endi
...@@ -72,35 +69,31 @@ sql select count(*) from (select count(*) from nest_mt0 group by tbname) ...@@ -72,35 +69,31 @@ sql select count(*) from (select count(*) from nest_mt0 group by tbname)
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 10 then if $data00 != 10 then
return -1 return -1
endi endi
sql select count(*) from (select count(*) from nest_mt0 interval(10h) group by tbname) sql select count(*) from (select count(*) from nest_mt0 partition by tbname interval(10h) )
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 170 then if $data00 != 170 then
return -1 return -1
endi endi
sql select sum(a) from (select count(*) a from nest_mt0 interval(10h) group by tbname) sql select sum(a) from (select count(*) a from nest_mt0 partition by tbname interval(10h))
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 100000 then if $data00 != 100000 then
return -1 return -1
endi endi
print =================> alias name test print =================> alias name test
sql select ts from (select count(*) a from nest_tb0 interval(1h)) sql select ts from (select _wstart as ts, count(*) a from nest_tb0 interval(1h))
if $rows != 167 then if $rows != 167 then
return -1 return -1
endi endi
if $data00 != @20-09-15 00:00:00.000@ then if $data00 != @20-09-15 00:00:00.000@ then
return -1 return -1
endi endi
...@@ -109,7 +102,6 @@ sql select count(a) from (select count(*) a from nest_tb0 interval(1h)) ...@@ -109,7 +102,6 @@ sql select count(a) from (select count(*) a from nest_tb0 interval(1h))
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 167 then if $data00 != 167 then
return -1 return -1
endi endi
...@@ -125,19 +117,16 @@ if $rows != 0 then ...@@ -125,19 +117,16 @@ if $rows != 0 then
return -1 return -1
endi endi
sql select * from (select count(*) a, tbname f1 from nest_mt0 group by tbname) t where t.a>0 and f1 = 'nest_tb0'; sql select * from (select count(*) a, tbname f1, tbname from nest_mt0 group by tbname) t where t.a>0 and f1 = 'nest_tb0';
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 10000 then if $data00 != 10000 then
return -1 return -1
endi endi
if $data01 != @nest_tb0@ then if $data01 != @nest_tb0@ then
return -1 return -1
endi endi
if $data02 != @nest_tb0@ then if $data02 != @nest_tb0@ then
return -1 return -1
endi endi
...@@ -145,37 +134,30 @@ endi ...@@ -145,37 +134,30 @@ endi
print ===================> nest query interval print ===================> nest query interval
sql_error select ts, avg(c1) from (select ts, c1 from nest_tb0); sql_error select ts, avg(c1) from (select ts, c1 from nest_tb0);
sql select avg(c1) from (select * from nest_tb0) interval(3d) sql select _wstart, avg(c1) from (select * from nest_tb0) interval(3d)
if $rows != 3 then if $rows != 3 then
return -1 return -1
endi endi
if $data00 != @20-09-14 00:00:00.000@ then if $data00 != @20-09-14 00:00:00.000@ then
return -1 return -1
endi endi
if $data01 != 49.222222222 then if $data01 != 49.222222222 then
return -1 return -1
endi endi
if $data10 != @20-09-17 00:00:00.000@ then if $data10 != @20-09-17 00:00:00.000@ then
print expect 20-09-17 00:00:00.000, actual: $data10
return -1 return -1
endi endi
if $data11 != 49.581325301 then
if $data11 != 49.685185185 then
return -1 return -1
endi endi
if $data20 != @20-09-20 00:00:00.000@ then if $data20 != @20-09-20 00:00:00.000@ then
return -1 return -1
endi endi
if $data21 != 49.703539823 then
if $data21 != 49.500000000 then
return -1 return -1
endi endi
sql_error select stddev(c1) from (select c1 from nest_tb0); sql select stddev(c1) from (select c1 from nest_tb0);
sql_error select percentile(c1, 20) from (select * from nest_tb0); sql_error select percentile(c1, 20) from (select * from nest_tb0);
sql_error select interp(c1) from (select * from nest_tb0); sql_error select interp(c1) from (select * from nest_tb0);
sql_error select derivative(val, 1s, 0) from (select c1 val from nest_tb0); sql_error select derivative(val, 1s, 0) from (select c1 val from nest_tb0);
...@@ -184,39 +166,31 @@ sql_error select irate(c1) from (select c1 from nest_tb0); ...@@ -184,39 +166,31 @@ sql_error select irate(c1) from (select c1 from nest_tb0);
sql_error select diff(c1), twa(c1) from (select * from nest_tb0); sql_error select diff(c1), twa(c1) from (select * from nest_tb0);
sql_error select irate(c1), interp(c1), twa(c1) from (select * from nest_tb0); sql_error select irate(c1), interp(c1), twa(c1) from (select * from nest_tb0);
sql select apercentile(c1, 50) from (select * from nest_tb0) interval(1d) sql select _wstart, apercentile(c1, 50) from (select * from nest_tb0) interval(1d)
if $rows != 7 then if $rows != 7 then
return -1 return -1
endi endi
if $data00 != @20-09-15 00:00:00.000@ then if $data00 != @20-09-15 00:00:00.000@ then
return -1 return -1
endi endi
if $data01 != 47.571428571 then if $data01 != 47.571428571 then
return -1 return -1
endi endi
if $data10 != @20-09-16 00:00:00.000@ then if $data10 != @20-09-16 00:00:00.000@ then
return -1 return -1
endi endi
if $data11 != 49.666666667 then if $data11 != 49.666666667 then
return -1 return -1
endi endi
if $data20 != @20-09-17 00:00:00.000@ then if $data20 != @20-09-17 00:00:00.000@ then
return -1 return -1
endi endi
if $data21 != 49.000000000 then if $data21 != 49.000000000 then
return -1 return -1
endi endi
if $data30 != @20-09-18 00:00:00.000@ then if $data30 != @20-09-18 00:00:00.000@ then
return -1 return -1
endi endi
if $data31 != 48.333333333 then if $data31 != 48.333333333 then
return -1 return -1
endi endi
...@@ -225,7 +199,6 @@ sql select twa(c1) from (select * from nest_tb0); ...@@ -225,7 +199,6 @@ sql select twa(c1) from (select * from nest_tb0);
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 49.500000000 then if $data00 != 49.500000000 then
return -1 return -1
endi endi
...@@ -234,7 +207,6 @@ sql select leastsquares(c1, 1, 1) from (select * from nest_tb0); ...@@ -234,7 +207,6 @@ sql select leastsquares(c1, 1, 1) from (select * from nest_tb0);
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != @{slop:0.000100, intercept:49.000000}@ then if $data00 != @{slop:0.000100, intercept:49.000000}@ then
return -1 return -1
endi endi
...@@ -248,19 +220,15 @@ sql select derivative(c1, 1s, 0) from (select * from nest_tb0); ...@@ -248,19 +220,15 @@ sql select derivative(c1, 1s, 0) from (select * from nest_tb0);
if $rows != 9999 then if $rows != 9999 then
return -1 return -1
endi endi
if $data00 != @20-09-15 00:01:00.000@ then if $data00 != @20-09-15 00:01:00.000@ then
return -1 return -1
endi endi
if $data01 != 0.016666667 then if $data01 != 0.016666667 then
return -1 return -1
endi endi
if $data10 != @20-09-15 00:02:00.000@ then if $data10 != @20-09-15 00:02:00.000@ then
return -1 return -1
endi endi
if $data11 != 0.016666667 then if $data11 != 0.016666667 then
return -1 return -1
endi endi
...@@ -274,54 +242,42 @@ sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spre ...@@ -274,54 +242,42 @@ sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spre
if $rows != 7 then if $rows != 7 then
return -1 return -1
endi endi
if $data00 != @20-09-15 00:00:00.000@ then if $data00 != @20-09-15 00:00:00.000@ then
return -1 return -1
endi endi
if $data01 != 48.666666667 then if $data01 != 48.666666667 then
print expect 48.666666667, actual: $data01 print expect 48.666666667, actual: $data01
return -1 return -1
endi endi
if $data02 != 70080.000000000 then if $data02 != 70080.000000000 then
return -1 return -1
endi endi
if $data03 != 99 then if $data03 != 99 then
return -1 return -1
endi endi
if $data04 != 0 then if $data04 != 0 then
return -1 return -1
endi endi
if $data05 != 1440 then if $data05 != 1440 then
return -1 return -1
endi endi
if $data06 != 0 then if $data06 != 0 then
print $data06 print $data06
return -1 return -1
endi endi
if $data07 != 1 then if $data07 != 1 then
return -1 return -1
endi endi
if $data08 != 99.000000000 then if $data08 != 99.000000000 then
print expect 99.000000000, actual: $data08 print expect 99.000000000, actual: $data08
return -1 return -1
endi endi
if $data10 != @20-09-16 00:00:00.000@ then if $data10 != @20-09-16 00:00:00.000@ then
return -1 return -1
endi endi
if $data11 != 49.777777778 then if $data11 != 49.777777778 then
return -1 return -1
endi endi
if $data12 != 71680.000000000 then if $data12 != 71680.000000000 then
return -1 return -1
endi endi
...@@ -332,39 +288,28 @@ sql select bottom(x, 20) from (select c1 x from nest_tb0) ...@@ -332,39 +288,28 @@ sql select bottom(x, 20) from (select c1 x from nest_tb0)
print ===================> group by + having print ===================> group by + having
print =========================> ascending order/descending order print =========================> ascending order/descending order
print =========================> nest query join print =========================> nest query join
sql select a.ts,a.k,b.ts from (select count(*) k from nest_tb0 interval(30a)) a, (select count(*) f from nest_tb1 interval(30a)) b where a.ts = b.ts ; sql select a.ts,a.k,b.ts from (select count(*) k from nest_tb0 interval(30a)) a, (select count(*) f from nest_tb1 interval(30a)) b where a.ts = b.ts ;
if $rows != 10000 then if $rows != 10000 then
return -1 return -1
endi endi
if $data00 != @20-09-15 00:00:00.000@ then if $data00 != @20-09-15 00:00:00.000@ then
return -1 return -1
endi endi
if $data01 != 1 then if $data01 != 1 then
return -1 return -1
endi endi
if $data02 != @20-09-15 00:00:00.000@ then if $data02 != @20-09-15 00:00:00.000@ then
return -1 return -1
endi endi
if $data10 != @20-09-15 00:01:00.000@ then if $data10 != @20-09-15 00:01:00.000@ then
return -1 return -1
endi endi
if $data11 != 1 then if $data11 != 1 then
return -1 return -1
endi endi
if $data12 != @20-09-15 00:01:00.000@ then if $data12 != @20-09-15 00:01:00.000@ then
return -1 return -1
endi endi
...@@ -373,11 +318,9 @@ sql select sum(a.k), sum(b.f) from (select count(*) k from nest_tb0 interval(30a ...@@ -373,11 +318,9 @@ sql select sum(a.k), sum(b.f) from (select count(*) k from nest_tb0 interval(30a
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 10000 then if $data00 != 10000 then
return -1 return -1
endi endi
if $data01 != 10000 then if $data01 != 10000 then
return -1 return -1
endi endi
...@@ -386,19 +329,15 @@ sql select a.ts,a.k,b.ts,c.ts,c.ts,c.x from (select count(*) k from nest_tb0 int ...@@ -386,19 +329,15 @@ sql select a.ts,a.k,b.ts,c.ts,c.ts,c.x from (select count(*) k from nest_tb0 int
if $rows != 10000 then if $rows != 10000 then
return -1 return -1
endi endi
if $data00 != @20-09-15 00:00:00.000@ then if $data00 != @20-09-15 00:00:00.000@ then
return -1 return -1
endi endi
if $data01 != 1 then if $data01 != 1 then
return -1 return -1
endi endi
if $data02 != @20-09-15 00:00:00.000@ then if $data02 != @20-09-15 00:00:00.000@ then
return -1 return -1
endi endi
if $data03 != @20-09-15 00:00:00.000@ then if $data03 != @20-09-15 00:00:00.000@ then
return -1 return -1
endi endi
...@@ -407,11 +346,9 @@ sql select diff(val) from (select c1 val from nest_tb0); ...@@ -407,11 +346,9 @@ sql select diff(val) from (select c1 val from nest_tb0);
if $rows != 9999 then if $rows != 9999 then
return -1 return -1
endi endi
if $data00 != @70-01-01 08:00:00.000@ then if $data00 != @70-01-01 08:00:00.000@ then
return -1 return -1
endi endi
if $data01 != 1 then if $data01 != 1 then
return -1 return -1
endi endi
...@@ -425,19 +362,15 @@ sql select count(*),c1 from (select * from nest_tb0) where c1 < 2 group by c1; ...@@ -425,19 +362,15 @@ sql select count(*),c1 from (select * from nest_tb0) where c1 < 2 group by c1;
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != 100 then if $data00 != 100 then
return -1 return -1
endi endi
if $data01 != 0 then if $data01 != 0 then
return -1 return -1
endi endi
if $data10 != 100 then if $data10 != 100 then
return -1 return -1
endi endi
if $data11 != 1 then if $data11 != 1 then
return -1 return -1
endi endi
...@@ -447,11 +380,9 @@ sql select twa(c1) from nest_tb1 interval(19a); ...@@ -447,11 +380,9 @@ sql select twa(c1) from nest_tb1 interval(19a);
if $rows != 10000 then if $rows != 10000 then
return -1 return -1
endi endi
if $data00 != @20-09-14 23:59:59.992@ then if $data00 != @20-09-14 23:59:59.992@ then
return -1 return -1
endi endi
if $data01 != 0.000083333 then if $data01 != 0.000083333 then
return -1 return -1
endi endi
...@@ -461,19 +392,15 @@ sql select min(val),max(val),first(val),last(val),count(val),sum(val),avg(val) f ...@@ -461,19 +392,15 @@ sql select min(val),max(val),first(val),last(val),count(val),sum(val),avg(val) f
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 10000 then if $data00 != 10000 then
return -1 return -1
endi endi
if $data01 != 10000 then if $data01 != 10000 then
return -1 return -1
endi endi
if $data04 != 10 then if $data04 != 10 then
return -1 return -1
endi endi
if $data05 != 100000 then if $data05 != 100000 then
return -1 return -1
endi endi
...@@ -487,19 +414,15 @@ sql select avg(k) from (select avg(k) k from t1 interval(1s)) interval(1m); ...@@ -487,19 +414,15 @@ sql select avg(k) from (select avg(k) k from t1 interval(1s)) interval(1m);
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != @20-01-01 01:01:00.000000@ then if $data00 != @20-01-01 01:01:00.000000@ then
return -1 return -1
endi endi
if $data01 != 1.000000000 then if $data01 != 1.000000000 then
return -1 return -1
endi endi
if $data10 != @20-01-01 01:02:00.000000@ then if $data10 != @20-01-01 01:02:00.000000@ then
return -1 return -1
endi endi
if $data11 != 2.000000000 then if $data11 != 2.000000000 then
return -1 return -1
endi endi
......
...@@ -102,11 +102,11 @@ $i = 1 ...@@ -102,11 +102,11 @@ $i = 1
$tb = $tbPrefix . $i $tb = $tbPrefix . $i
## column type not identical ## column type not identical
sql_error select count(*) as a from union_mt0 union all select avg(c1) as a from union_mt0 sql select count(*) as a from union_mt0 union all select avg(c1) as a from union_mt0
sql_error select count(*) as a from union_mt0 union all select spread(c1) as a from union_mt0; sql select count(*) as a from union_mt0 union all select spread(c1) as a from union_mt0;
## union not supported ## union not supported
sql_error (select count(*) from union_mt0) union (select count(*) from union_mt0); sql (select count(*) from union_mt0) union (select count(*) from union_mt0);
## column type not identical ## column type not identical
sql_error select c1 from union_mt0 limit 10 union all select c2 from union_tb1 limit 20; sql_error select c1 from union_mt0 limit 10 union all select c2 from union_tb1 limit 20;
...@@ -123,145 +123,114 @@ sql (((select c1 from union_tb0))) ...@@ -123,145 +123,114 @@ sql (((select c1 from union_tb0)))
if $rows != 10000 then if $rows != 10000 then
return -1 return -1
endi endi
if $data00 != 0 then if $data00 != 0 then
return -1 return -1
endi endi
if $data10 != 1 then if $data10 != 1 then
return -1 return -1
endi endi
sql select 'ab' as options from union_tb1 limit 1 union all select 'dd' as options from union_tb0 limit 1; sql (select 'ab' as options from union_tb1 limit 1) union all (select 'dd' as options from union_tb0 limit 1) order by options;
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != @ab@ then if $data00 != @ab@ then
return -1 return -1
endi endi
if $data10 != @dd@ then if $data10 != @dd@ then
return -1 return -1
endi endi
sql (select 'ab12345' as options from union_tb1 limit 1) union all (select '1234567' as options from union_tb0 limit 1) order by options desc;
sql select 'ab' as options from union_tb1 limit 1 union all select '1234567' as options from union_tb0 limit 1;
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != @ab12345@ then
if $data00 != @ab@ then
return -1 return -1
endi endi
if $data10 != @1234567@ then if $data10 != @1234567@ then
return -1 return -1
endi endi
# mixed order # mixed order
sql select ts, c1 from union_tb1 order by ts asc limit 10 union all select ts, c1 from union_tb0 order by ts desc limit 2 union all select ts, c1 from union_tb2 order by ts asc limit 10 sql (select ts, c1 from union_tb1 order by ts asc limit 10) union all (select ts, c1 from union_tb0 order by ts desc limit 2) union all (select ts, c1 from union_tb2 order by ts asc limit 10) order by ts
if $rows != 22 then if $rows != 22 then
return -1 return -1
endi endi
if $data00 != @20-01-05 13:51:24.000@ then if $data00 != @20-01-05 13:51:24.000@ then
return -1 return -1
endi endi
if $data01 != 0 then if $data01 != 0 then
return -1 return -1
endi endi
if $data10 != @20-01-05 13:51:24.000@ then
if $data10 != @20-01-05 13:52:24.000@ then
return -1 return -1
endi endi
if $data11 != 0 then
if $data11 != 1 then
return -1 return -1
endi endi
print $data90 $data91
if $data90 != @20-01-05 14:00:24.000@ then if $data90 != @20-01-05 13:55:24.000@ then
return -1 return -1
endi endi
if $data91 != 4 then
if $data91 != 9 then
return -1 return -1
endi endi
# different sort order # different sort order
# super table & normal table mixed up # super table & normal table mixed up
sql select c3 from union_tb0 limit 2 union all select sum(c1) as c3 from union_mt0; sql (select c3 from union_tb0 limit 2) union all (select sum(c1) as c3 from union_mt0) order by c3;
if $rows != 3 then if $rows != 3 then
return -1 return -1
endi endi
if $data00 != 0 then if $data00 != 0 then
return -1 return -1
endi endi
if $data10 != 1 then if $data10 != 1 then
return -1 return -1
endi endi
if $data20 != 4950000 then if $data20 != 4950000 then
return -1 return -1
endi endi
# type compatible # type compatible
sql select c3 from union_tb0 limit 2 union all select sum(c1) as c3 from union_tb1; sql (select c3 from union_tb0 limit 2) union all (select sum(c1) as c3 from union_tb1) order by c3;
if $rows != 3 then if $rows != 3 then
return -1 return -1
endi endi
if $data00 != 0 then if $data00 != 0 then
return -1 return -1
endi endi
if $data10 != 1 then if $data10 != 1 then
return -1 return -1
endi endi
if $data20 != 495000 then if $data20 != 495000 then
return -1 return -1
endi endi
# two join subclause # two join subclause
sql select count(*) as c from union_tb0, union_tb1 where union_tb0.ts=union_tb1.ts union all select union_tb0.c3 as c from union_tb0, union_tb1 where union_tb0.ts=union_tb1.ts limit 10 sql (select count(*) as c from union_tb0, union_tb1 where union_tb0.ts=union_tb1.ts) union all (select union_tb0.c3 as c from union_tb0, union_tb1 where union_tb0.ts=union_tb1.ts limit 10) order by c desc
if $rows != 11 then if $rows != 11 then
return -1 return -1
endi endi
if $data00 != 10000 then if $data00 != 10000 then
return -1 return -1
endi endi
if $data10 != 9 then
if $data10 != 0 then
return -1 return -1
endi endi
if $data20 != 8 then
if $data20 != 1 then
return -1 return -1
endi endi
if $data90 != 1 then
if $data90 != 8 then
return -1 return -1
endi endi
print ===========================================tags union print ===========================================tags union
# two super table tag union, limit is not active during retrieve tags query # two super table tag union, limit is not active during retrieve tags query
sql select t1 from union_mt0 union all select t1 from union_mt0 sql (select t1 from union_mt0) union all (select t1 from union_mt0)
if $rows != 20 then if $rows != 200000 then
return -1
endi
if $data00 != 0 then
return -1
endi
if $data90 != 9 then
return -1 return -1
endi endi
...@@ -271,39 +240,35 @@ endi ...@@ -271,39 +240,35 @@ endi
#endi #endi
#========================================== two super table join subclause #========================================== two super table join subclause
print ================two super table join subclause print ================two super table join subclause
sql select avg(union_mt0.c1) as c from union_mt0 interval(1h) limit 10 union all select union_mt1.ts, union_mt1.c1/1.0 as c from union_mt0, union_mt1 where union_mt1.ts=union_mt0.ts and union_mt1.t1=union_mt0.t1 limit 5; sql (select _wstart as ts, avg(union_mt0.c1) as c from union_mt0 interval(1h) limit 10) union all (select union_mt1.ts, union_mt1.c1/1.0 as c from union_mt0, union_mt1 where union_mt1.ts=union_mt0.ts and union_mt1.t1=union_mt0.t1 limit 5);
print the rows value is: $rows print the rows value is: $rows
if $rows != 15 then if $rows != 15 then
return -1 return -1
endi endi
# first subclause are empty # first subclause are empty
sql select count(*) as c from union_tb0 where ts > now + 3650d union all select sum(c1) as c from union_tb1; sql (select count(*) as c from union_tb0 where ts > now + 3650d) union all (select sum(c1) as c from union_tb1);
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 495000 then if $data00 != 495000 then
return -1 return -1
endi endi
# all subclause are empty # all subclause are empty
sql select c1 from union_tb0 limit 0 union all select c1 from union_tb1 where ts>'2021-1-1 0:0:0' sql (select c1 from union_tb0 limit 0) union all (select c1 from union_tb1 where ts>'2021-1-1 0:0:0')
if $rows != 0 then if $rows != 0 then
return -1 return -1
endi endi
# middle subclause empty # middle subclause empty
sql select c1 from union_tb0 limit 1 union all select c1 from union_tb1 where ts>'2030-1-1 0:0:0' union all select last(c1) as c1 from union_tb1; sql (select c1 from union_tb0 limit 1) union all (select c1 from union_tb1 where ts>'2030-1-1 0:0:0' union all select last(c1) as c1 from union_tb1) order by c1;
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != 0 then if $data00 != 0 then
return -1 return -1
endi endi
if $data10 != 99 then if $data10 != 99 then
return -1 return -1
endi endi
...@@ -319,141 +284,90 @@ sql (select ts, c1 from union_mt0 limit 1) union all (select ts, c1 from union_m ...@@ -319,141 +284,90 @@ sql (select ts, c1 from union_mt0 limit 1) union all (select ts, c1 from union_m
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != @20-01-05 13:51:24.000@ then if $data00 != @20-01-05 13:51:24.000@ then
return -1 return -1
endi endi
if $data01 != 0 then if $data01 != 0 then
return -1 return -1
endi endi
if $data10 != @20-01-05 13:51:24.000@ then if $data10 != @20-01-05 13:51:24.000@ then
return -1 return -1
endi endi
if $data11 != 0 then if $data11 != 0 then
return -1 return -1
endi endi
# two aggregated functions for super tables # two aggregated functions for super tables
sql select sum(c1) as a from union_mt0 interval(1s) limit 9 union all select ts, max(c3) as a from union_mt0 limit 2; sql (select _wstart as ts, sum(c1) as a from union_mt0 interval(1s) limit 9) union all (select ts, max(c3) as a from union_mt0 limit 2) order by ts;
if $rows != 10 then if $rows != 10 then
return -1 return -1
endi endi
if $data00 != @20-01-05 13:51:24.000@ then if $data00 != @20-01-05 13:51:24.000@ then
return -1 return -1
endi endi
if $data01 != 0 then if $data01 != 0 then
return -1 return -1
endi endi
if $data10 != @20-01-05 13:52:24.000@ then if $data10 != @20-01-05 13:52:24.000@ then
return -1 return -1
endi endi
if $data11 != 10 then if $data11 != 10 then
return -1 return -1
endi endi
if $data20 != @20-01-05 13:53:24.000@ then if $data20 != @20-01-05 13:53:24.000@ then
return -1 return -1
endi endi
if $data21 != 20 then if $data21 != 20 then
return -1 return -1
endi endi
if $data90 != @20-01-05 15:30:24.000@ then if $data90 != @20-01-05 15:30:24.000@ then
return -1 return -1
endi endi
if $data91 != 99 then if $data91 != 99 then
return -1 return -1
endi endi
#================================================================================================= #=================================================================================================
# two aggregated functions for normal tables # two aggregated functions for normal tables
sql select sum(c1) as a from union_tb0 limit 1 union all select sum(c3) as a from union_tb1 limit 2; sql (select sum(c1) as a from union_tb0 limit 1) union all (select sum(c3) as a from union_tb1 limit 2);
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != 495000 then if $data00 != 495000 then
return -1 return -1
endi endi
if $data10 != 495000 then if $data10 != 495000 then
return -1 return -1
endi endi
# two super table query + interval + limit # two super table query + interval + limit
sql select ts, first(c3) as a from union_mt0 limit 1 union all select sum(c3) as a from union_mt0 interval(1h) limit 1; sql (select ts, first(c3) as a from union_mt0 limit 1) union all (select _wstart as ts, sum(c3) as a from union_mt0 interval(1h) limit 1) order by ts desc;
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != @20-01-05 13:51:24.000@ then if $data00 != @20-01-05 13:51:24.000@ then
return -1 return -1
endi endi
if $data01 != 0 then if $data01 != 0 then
return -1 return -1
endi endi
if $data10 != @20-01-05 13:00:00.000@ then if $data10 != @20-01-05 13:00:00.000@ then
return -1 return -1
endi endi
if $data11 != 360 then if $data11 != 360 then
return -1 return -1
endi endi
sql select server_status() union all select server_status() sql (select 'aaa' as option from union_tb1 where c1 < 0 limit 1) union all (select 'bbb' as option from union_tb0 limit 1)
if $rows != 2 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data10 != 1 then
return -1
endi
sql select client_version() union all select server_version()
if $rows != 2 then
return -1
endi
sql select database() union all select database()
if $rows != 2 then
return -1
endi
if $data00 != @union_db0@ then
return -1
endi
if $data10 != @union_db0@ then
return -1
endi
sql select 'aaa' as option from union_tb1 where c1 < 0 limit 1 union all select 'bbb' as option from union_tb0 limit 1
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != @bbb@ then if $data00 != @bbb@ then
return -1 return -1
endi endi
sql_error (show tables) union all (show tables)
sql_error show tables union all show tables sql_error (show stables) union all (show stables)
sql_error show stables union all show stables sql_error (show databases) union all (show databases)
sql_error show databases union all show databases
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql (select server_status()) union all (select server_status())
if $rows != 2 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data10 != 1 then
return -1
endi
sql (select client_version()) union all (select server_version())
if $rows != 2 then
return -1
endi
sql (select database()) union all (select database())
if $rows != 2 then
return -1
endi
if $data00 != @union_db0@ then
return -1
endi
if $data10 != @union_db0@ then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -214,6 +214,24 @@ class TDTestCase: ...@@ -214,6 +214,24 @@ class TDTestCase:
tdSql.checkRows((row_num-i)*tb_num) tdSql.checkRows((row_num-i)*tb_num)
for j in range(tb_num): for j in range(tb_num):
self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data) self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data)
for i in range(row_num):
tdSql.execute(f'delete from {tbname} where ts between {self.ts} and {self.ts+i}')
tdSql.execute(f'flush database {dbname}')
tdSql.execute('reset query cache')
tdSql.query(f'select {col_name} from {tbname}')
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(row_num - i-1)
self.insert_base_data(col_type,tbname,row_num,base_data)
elif tb_type == 'stb':
tdSql.checkRows(tb_num*(row_num - i-1))
for j in range(tb_num):
self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data)
tdSql.execute(f'delete from {tbname} where ts between {self.ts+i+1} and {self.ts}')
tdSql.query(f'select {col_name} from {tbname}')
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(row_num)
elif tb_type == 'stb':
tdSql.checkRows(tb_num*row_num)
def delete_error(self,tbname,column_name,column_type,base_data): def delete_error(self,tbname,column_name,column_type,base_data):
for error_list in ['',f'ts = {self.ts} and',f'ts = {self.ts} or']: for error_list in ['',f'ts = {self.ts} and',f'ts = {self.ts} or']:
if 'binary' in column_type.lower(): if 'binary' in column_type.lower():
...@@ -221,7 +239,8 @@ class TDTestCase: ...@@ -221,7 +239,8 @@ class TDTestCase:
elif 'nchar' in column_type.lower(): elif 'nchar' in column_type.lower():
tdSql.error(f'''delete from {tbname} where {error_list} {column_name} ="{base_data['nchar']}"''') tdSql.error(f'''delete from {tbname} where {error_list} {column_name} ="{base_data['nchar']}"''')
else: else:
tdSql.error(f'delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}') tdSql.error(f'delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}')
def delete_data_ntb(self): def delete_data_ntb(self):
tdSql.execute(f'create database if not exists {self.dbname}') tdSql.execute(f'create database if not exists {self.dbname}')
tdSql.execute(f'use {self.dbname}') tdSql.execute(f'use {self.dbname}')
......
...@@ -81,39 +81,63 @@ class TDTestCase: ...@@ -81,39 +81,63 @@ class TDTestCase:
if col_type.lower() == 'double': if col_type.lower() == 'double':
for error_value in [tdCom.getLongName(self.str_length),True,False,1.1*constant.DOUBLE_MIN,1.1*constant.DOUBLE_MAX]: for error_value in [tdCom.getLongName(self.str_length),True,False,1.1*constant.DOUBLE_MIN,1.1*constant.DOUBLE_MAX]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'float': elif col_type.lower() == 'float':
for error_value in [tdCom.getLongName(self.str_length),True,False,1.1*constant.FLOAT_MIN,1.1*constant.FLOAT_MAX]: for error_value in [tdCom.getLongName(self.str_length),True,False,1.1*constant.FLOAT_MIN,1.1*constant.FLOAT_MAX]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif 'binary' in col_type.lower() or 'nchar' in col_type.lower(): elif 'binary' in col_type.lower() or 'nchar' in col_type.lower():
for error_value in [tdCom.getLongName(str_length)]: for error_value in [tdCom.getLongName(str_length)]:
tdSql.error(f'insert into {tbname} values({self.ts},"{error_value}")') tdSql.error(f'insert into {tbname} values({self.ts},"{error_value}")')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'bool': elif col_type.lower() == 'bool':
for error_value in [tdCom.getLongName(self.str_length)]: for error_value in [tdCom.getLongName(self.str_length)]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'tinyint': elif col_type.lower() == 'tinyint':
for error_value in [constant.TINYINT_MIN-1,constant.TINYINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.TINYINT_MIN-1,constant.TINYINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'smallint': elif col_type.lower() == 'smallint':
for error_value in [constant.SMALLINT_MIN-1,constant.SMALLINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.SMALLINT_MIN-1,constant.SMALLINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'int': elif col_type.lower() == 'int':
for error_value in [constant.INT_MIN-1,constant.INT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.INT_MIN-1,constant.INT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'bigint': elif col_type.lower() == 'bigint':
for error_value in [constant.BIGINT_MIN-1,constant.BIGINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.BIGINT_MIN-1,constant.BIGINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'tinyint unsigned': elif col_type.lower() == 'tinyint unsigned':
for error_value in [constant.TINYINT_UN_MIN-1,constant.TINYINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.TINYINT_UN_MIN-1,constant.TINYINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'smallint unsigned': elif col_type.lower() == 'smallint unsigned':
for error_value in [constant.SMALLINT_UN_MIN-1,constant.SMALLINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.SMALLINT_UN_MIN-1,constant.SMALLINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'int unsigned': elif col_type.lower() == 'int unsigned':
for error_value in [constant.INT_UN_MIN-1,constant.INT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.INT_UN_MIN-1,constant.INT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'bigint unsigned': elif col_type.lower() == 'bigint unsigned':
for error_value in [constant.BIGINT_UN_MIN-1,constant.BIGINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.BIGINT_UN_MIN-1,constant.BIGINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
tdSql.execute(f'drop table {tbname}') tdSql.execute(f'drop table {tbname}')
if tb_type == 'ctb': if tb_type == 'ctb':
tdSql.execute(f'drop table {stbname}') tdSql.execute(f'drop table {stbname}')
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import random
import string
from numpy import logspace
from util import constant
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.dbname = 'db_test'
self.ntbname = 'ntb'
self.stbname = 'stb'
self.rowNum = 10
self.tbnum = 5
self.ts = 1537146000000
self.str_length = 20
self.column_dict = {
'col1': 'tinyint',
'col2': 'smallint',
'col3': 'int',
'col4': 'bigint',
'col5': 'tinyint unsigned',
'col6': 'smallint unsigned',
'col7': 'int unsigned',
'col8': 'bigint unsigned',
'col9': 'float',
'col10': 'double',
'col11': 'bool',
'col12': f'binary({self.str_length})',
'col13': f'nchar({self.str_length})'
}
self.tinyint_val = random.randint(constant.TINYINT_MIN,constant.TINYINT_MAX)
self.smallint_val = random.randint(constant.SMALLINT_MIN,constant.SMALLINT_MAX)
self.int_val = random.randint(constant.INT_MIN,constant.INT_MAX)
self.bigint_val = random.randint(constant.BIGINT_MIN,constant.BIGINT_MAX)
self.untingint_val = random.randint(constant.TINYINT_UN_MIN,constant.TINYINT_UN_MAX)
self.unsmallint_val = random.randint(constant.SMALLINT_UN_MIN,constant.SMALLINT_UN_MAX)
self.unint_val = random.randint(constant.INT_UN_MIN,constant.INT_MAX)
self.unbigint_val = random.randint(constant.BIGINT_UN_MIN,constant.BIGINT_UN_MAX)
self.float_val = random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX)
self.double_val = random.uniform(constant.DOUBLE_MIN*(1E-300),constant.DOUBLE_MAX*(1E-300))
self.bool_val = random.randint(0,2)%2
self.binary_val = tdCom.getLongName(random.randint(0,self.str_length))
self.nchar_val = tdCom.getLongName(random.randint(0,self.str_length))
self.data = {
'tinyint':self.tinyint_val,
'smallint':self.smallint_val,
'int':self.int_val,
'bigint':self.bigint_val,
'tinyint unsigned':self.untingint_val,
'smallint unsigned':self.unsmallint_val,
'int unsigned':self.unint_val,
'bigint unsigned':self.unbigint_val,
'bool':self.bool_val,
'float':self.float_val,
'double':self.double_val,
'binary':self.binary_val,
'nchar':self.nchar_val
}
def update_data(self,dbname,tbname,tb_num,rows,values,col_type):
sql = f'insert into '
for j in range(tb_num):
sql += f'{dbname}.{tbname}_{j} values'
for i in range(rows):
if 'binary' in col_type.lower() or 'nchar' in col_type.lower():
sql += f'({self.ts+i},"{values}")'
else:
sql += f'({self.ts+i},{values})'
sql += ' '
tdSql.execute(sql)
def insert_data(self,col_type,tbname,rows,data):
for i in range(rows):
if col_type.lower() == 'tinyint':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["tinyint"]})')
elif col_type.lower() == 'smallint':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["smallint"]})')
elif col_type.lower() == 'int':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["int"]})')
elif col_type.lower() == 'bigint':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["bigint"]})')
elif col_type.lower() == 'tinyint unsigned':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["tinyint unsigned"]})')
elif col_type.lower() == 'smallint unsigned':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["smallint unsigned"]})')
elif col_type.lower() == 'int unsigned':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["int unsigned"]})')
elif col_type.lower() == 'bigint unsigned':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["bigint unsigned"]})')
elif col_type.lower() == 'bool':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["bool"]})')
elif col_type.lower() == 'float':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["float"]})')
elif col_type.lower() == 'double':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["double"]})')
elif 'binary' in col_type.lower():
tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{data['binary']}")''')
elif 'nchar' in col_type.lower():
tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{data['nchar']}")''')
def data_check(self,dbname,tbname,tbnum,rownum,data,col_name,col_type):
if 'binary' in col_type.lower():
self.update_data(dbname,f'{tbname}',tbnum,rownum,data['binary'],col_type)
elif 'nchar' in col_type.lower():
self.update_data(dbname,f'{tbname}',tbnum,rownum,data['nchar'],col_type)
else:
self.update_data(dbname,f'{tbname}',tbnum,rownum,data[col_type],col_type)
tdSql.execute(f'flush database {dbname}')
tdSql.execute('reset query cache')
for i in range(self.tbnum):
tdSql.query(f'select {col_name} from {dbname}.{tbname}_{i}')
for j in range(rownum):
if col_type.lower() == 'float' or col_type.lower() == 'double':
if abs(tdSql.queryResult[j][0] - data[col_type]) / data[col_type] <= 0.0001:
tdSql.checkEqual(tdSql.queryResult[j][0],tdSql.queryResult[j][0])
elif 'binary' in col_type.lower():
tdSql.checkEqual(tdSql.queryResult[j][0],data['binary'])
elif 'nchar' in col_type.lower():
tdSql.checkEqual(tdSql.queryResult[j][0],data['nchar'])
else:
tdSql.checkEqual(tdSql.queryResult[j][0],data[col_type])
def update_data_ntb(self):
tdSql.execute(f'drop database if exists {self.dbname}')
tdSql.execute(f'create database {self.dbname}')
tdSql.execute(f'use {self.dbname}')
for col_name,col_type in self.column_dict.items():
for i in range(self.tbnum):
tdSql.execute(f'create table {self.dbname}.{self.ntbname}_{i} (ts timestamp,{col_name} {col_type})')
for j in range(self.rowNum):
tdSql.execute(f'insert into {self.dbname}.{self.ntbname}_{i} values({self.ts+j},null)' )
tdSql.execute(f'flush database {self.dbname}')
tdSql.execute('reset query cache')
self.data_check(self.dbname,self.ntbname,self.tbnum,self.rowNum,self.data,col_name,col_type)
for i in range(self.tbnum):
tdSql.execute(f'drop table {self.ntbname}_{i}')
def update_data_ctb(self):
tdSql.execute(f'drop database if exists {self.dbname}')
tdSql.execute(f'create database {self.dbname}')
tdSql.execute(f'use {self.dbname}')
for col_name,col_type in self.column_dict.items():
tdSql.execute(f'create table {self.dbname}.{self.stbname} (ts timestamp,{col_name} {col_type}) tags(t0 int)')
for i in range(self.tbnum):
tdSql.execute(f'create table {self.dbname}.{self.stbname}_{i} using {self.dbname}.{self.stbname} tags(1)')
for j in range(self.rowNum):
tdSql.execute(f'insert into {self.dbname}.{self.stbname}_{i} values({self.ts+j},null)' )
tdSql.execute(f'flush database {self.dbname}')
tdSql.execute('reset query cache')
self.data_check(self.dbname,self.stbname,self.tbnum,self.rowNum,self.data,col_name,col_type)
tdSql.execute(f'drop table {self.stbname}')
def run(self):
self.update_data_ntb()
self.update_data_ctb()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
...@@ -94,17 +94,15 @@ class TDTestCase: ...@@ -94,17 +94,15 @@ class TDTestCase:
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})') tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})')
#!TODO tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})')
# tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})') tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
# tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
tdSql.query(f'select count(tbname) from {self.stbname}') tdSql.query(f'select count(tbname) from {self.stbname}')
tdSql.checkRows(0) tdSql.checkRows(0)
tdSql.execute('flush database db') tdSql.execute('flush database db')
tdSql.query(f'select count(tbname) from {self.stbname}') tdSql.query(f'select count(tbname) from {self.stbname}')
tdSql.checkRows(0) tdSql.checkRows(0)
#!TODO tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})')
# tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})') tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
# tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
for i in range(self.tbnum): for i in range(self.tbnum):
self.insert_data(self.column_dict,f'{self.stbname}_{i}',self.rowNum) self.insert_data(self.column_dict,f'{self.stbname}_{i}',self.rowNum)
self.count_query_stb(self.column_dict,self.tag_dict,self.stbname,self.tbnum,self.rowNum) self.count_query_stb(self.column_dict,self.tag_dict,self.stbname,self.tbnum,self.rowNum)
......
...@@ -20,6 +20,8 @@ import threading ...@@ -20,6 +20,8 @@ import threading
import requests import requests
import time import time
# import socketfrom # import socketfrom
import json
import toml
import taos import taos
from util.log import * from util.log import *
...@@ -207,7 +209,7 @@ class TMQCom: ...@@ -207,7 +209,7 @@ class TMQCom:
def drop_ctable(self, tsql, dbname=None, count=1, default_ctbname_prefix="ctb",ctbStartIdx=0): def drop_ctable(self, tsql, dbname=None, count=1, default_ctbname_prefix="ctb",ctbStartIdx=0):
for _ in range(count): for _ in range(count):
create_ctable_sql = f'drop table {dbname}.{default_ctbname_prefix}{ctbStartIdx};' create_ctable_sql = f'drop table if exists {dbname}.{default_ctbname_prefix}{ctbStartIdx};'
ctbStartIdx += 1 ctbStartIdx += 1
tdLog.info("drop ctb sql: %s"%create_ctable_sql) tdLog.info("drop ctb sql: %s"%create_ctable_sql)
tsql.execute(create_ctable_sql) tsql.execute(create_ctable_sql)
...@@ -503,6 +505,37 @@ class TMQCom: ...@@ -503,6 +505,37 @@ class TMQCom:
break break
return return
def create_ntable(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=None, colPrefix='c', tblNum=1, **kwargs):
tb_params = ""
if len(kwargs) > 0:
for param, value in kwargs.items():
tb_params += f'{param} "{value}" '
column_type_str = tdCom.gen_column_type_str(colPrefix, column_elm_list)
for _ in range(tblNum):
create_table_sql = f'create table {dbname}.{tbname_prefix}{tbname_index_start_num} ({column_type_str}) {tb_params};'
tbname_index_start_num += 1
tsql.execute(create_table_sql)
def insert_rows_into_ntbl(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=None, startTs=None, tblNum=1, rows=1):
if startTs is None:
startTs = tdCom.genTs()[0]
for tblIdx in range(tblNum):
for rowIdx in range(rows):
column_value_list = tdCom.gen_column_value_list(column_ele_list, f'{startTs}+{rowIdx}s')
column_value_str = ''
idx = 0
for column_value in column_value_list:
if isinstance(column_value, str) and idx != 0:
column_value_str += f'"{column_value}", '
else:
column_value_str += f'{column_value}, '
idx += 1
column_value_str = column_value_str.rstrip()[:-1]
insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});'
tsql.execute(insert_sql)
def close(self): def close(self):
self.cursor.close() self.cursor.close()
......
import taos
import sys
import time
import socket
import os
import threading
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.snapshot = 0
self.vgroups = 4
self.ctbNum = 100
self.rowsPerTbl = 10
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def waitSubscriptionExit(self, max_wait_count=20):
wait_cnt = 0
while (wait_cnt < max_wait_count):
tdSql.query("show subscriptions")
if tdSql.getRows() == 0:
break
else:
time.sleep(1)
wait_cnt += 1
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
# drop some ntbs
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ntb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0,
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdLog.info("start create database....")
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("start create normal tables....")
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("create topics from database")
topicFromDb = 'topic_dbt'
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
if self.snapshot == 0:
consumerId = 0
elif self.snapshot == 1:
consumerId = 1
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
topicList = topicFromDb
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("drop some ntables")
# drop 1/4 ctbls from half offset
paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
tdLog.printNoPrefix("======== test case 1 end ...... ")
# drop some ntbs and create some new ntbs
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ntb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0,
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdLog.info("start create database....")
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("start create normal tables....")
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("create topics from database")
topicFromDb = 'topic_dbt'
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
if self.snapshot == 0:
consumerId = 2
elif self.snapshot == 1:
consumerId = 3
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
topicList = topicFromDb
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("drop some ntables")
# drop 1/4 ctbls from half offset
paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
tdLog.info("start create some new normal tables....")
paraDict["ctbPrefix"] = 'newCtb'
paraDict["ctbNum"] = self.ctbNum
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into these new normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.snapshot = 0
# self.tmqCase1()
self.tmqCase2()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
# self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
...@@ -32,6 +32,9 @@ python3 ./test.py -f 1-insert/block_wise.py ...@@ -32,6 +32,9 @@ python3 ./test.py -f 1-insert/block_wise.py
python3 ./test.py -f 1-insert/create_retentions.py python3 ./test.py -f 1-insert/create_retentions.py
python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 1-insert/table_param_ttl.py
python3 ./test.py -f 1-insert/update_data_muti_rows.py
python3 ./test.py -f 2-query/abs.py python3 ./test.py -f 2-query/abs.py
python3 ./test.py -f 2-query/abs.py -R python3 ./test.py -f 2-query/abs.py -R
python3 ./test.py -f 2-query/and_or_for_byte.py python3 ./test.py -f 2-query/and_or_for_byte.py
...@@ -59,7 +62,9 @@ python3 ./test.py -f 2-query/char_length.py -R ...@@ -59,7 +62,9 @@ python3 ./test.py -f 2-query/char_length.py -R
python3 ./test.py -f 2-query/check_tsdb.py python3 ./test.py -f 2-query/check_tsdb.py
python3 ./test.py -f 2-query/check_tsdb.py -R python3 ./test.py -f 2-query/check_tsdb.py -R
# jira python3 ./test.py -f 1-insert/update_data.py
python3 ./test.py -f 1-insert/update_data.py
python3 ./test.py -f 1-insert/delete_data.py python3 ./test.py -f 1-insert/delete_data.py
python3 ./test.py -f 2-query/db.py python3 ./test.py -f 2-query/db.py
......
Subproject commit df8678f070e3f707faf59baebec90065f6e1268b Subproject commit d8f19ede56f1f489c5d2ac8f963cced01e68ecef
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册