提交 9773a14a 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/tsim

...@@ -68,7 +68,7 @@ typedef struct { ...@@ -68,7 +68,7 @@ typedef struct {
typedef struct { typedef struct {
char* qmsg; char* qmsg;
qTaskInfo_t task[5]; qTaskInfo_t task;
} STqExecCol; } STqExecCol;
typedef struct { typedef struct {
...@@ -82,7 +82,7 @@ typedef struct { ...@@ -82,7 +82,7 @@ typedef struct {
typedef struct { typedef struct {
int8_t subType; int8_t subType;
STqReader* pExecReader[5]; STqReader* pExecReader;
union { union {
STqExecCol execCol; STqExecCol execCol;
STqExecTb execTb; STqExecTb execTb;
...@@ -139,8 +139,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa ...@@ -139,8 +139,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec // tqExec
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId); int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp);
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
// tqMeta // tqMeta
......
...@@ -146,7 +146,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int32_t colId); ...@@ -146,7 +146,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int32_t colId);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
......
...@@ -262,7 +262,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su ...@@ -262,7 +262,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; } static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; }
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq* pReq = pMsg->pCont; SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t timeout = pReq->timeout; int64_t timeout = pReq->timeout;
...@@ -271,9 +271,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -271,9 +271,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffsetVal reqOffset = pReq->reqOffset; STqOffsetVal reqOffset = pReq->reqOffset;
STqOffsetVal fetchOffsetNew; STqOffsetVal fetchOffsetNew;
// todo
workerId = 0;
// 1.find handle // 1.find handle
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/ /*ASSERT(pHandle);*/
...@@ -405,7 +402,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -405,7 +402,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp, workerId) < 0) { if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) {
/*ASSERT(0);*/ /*ASSERT(0);*/
} }
// TODO batch optimization: // TODO batch optimization:
...@@ -518,7 +515,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -518,7 +515,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->execHandle.execCol.qmsg = req.qmsg;
pHandle->snapshotVer = ver; pHandle->snapshotVer = ver;
req.qmsg = NULL; req.qmsg = NULL;
for (int32_t i = 0; i < 5; i++) {
SReadHandle handle = { SReadHandle handle = {
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
...@@ -526,19 +522,16 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -526,19 +522,16 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.initTqReader = true, .initTqReader = true,
.version = ver, .version = ver,
}; };
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, pHandle->execHandle.execCol.task =
&pHandle->execHandle.pSchemaWrapper); qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, &pHandle->execHandle.pSchemaWrapper);
ASSERT(pHandle->execHandle.execCol.task[i]); ASSERT(pHandle->execHandle.execCol.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner);
ASSERT(scanner); ASSERT(scanner);
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader[i]); ASSERT(pHandle->execHandle.pExecReader);
}
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
for (int32_t i = 0; i < 5; i++) { pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
}
pHandle->execHandle.execDb.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
...@@ -550,10 +543,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -550,10 +543,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
} }
for (int32_t i = 0; i < 5; i++) { pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode); tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
}
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
} }
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
......
...@@ -37,8 +37,8 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, ...@@ -37,8 +37,8 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp,
return 0; return 0;
} }
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataRsp* pRsp) { static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper);
if (pSW == NULL) { if (pSW == NULL) {
return -1; return -1;
} }
...@@ -61,7 +61,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { ...@@ -61,7 +61,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle; const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->execCol.task[0]; qTaskInfo_t task = pExec->execCol.task;
if (qStreamPrepareScan(task, pOffset) < 0) { if (qStreamPrepareScan(task, pOffset) < 0) {
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
...@@ -89,7 +89,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa ...@@ -89,7 +89,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
if (pRsp->withTbName) { if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader[0]->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue; continue;
} }
...@@ -184,12 +184,12 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S ...@@ -184,12 +184,12 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
} }
#endif #endif
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) { int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) {
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
pRsp->withSchema = 1; pRsp->withSchema = 1;
STqReader* pReader = pExec->pExecReader[workerId]; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
...@@ -197,18 +197,18 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR ...@@ -197,18 +197,18 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue; continue;
} }
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
pRsp->withSchema = 1; pRsp->withSchema = 1;
STqReader* pReader = pExec->pExecReader[workerId]; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
...@@ -216,13 +216,13 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR ...@@ -216,13 +216,13 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue; continue;
} }
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }
} }
......
...@@ -80,11 +80,7 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -80,11 +80,7 @@ int32_t tqMetaOpen(STQ* pTq) {
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle); tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
/*for (int32_t i = 0; i < 5; i++) {*/
/*handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) {
SReadHandle reader = { SReadHandle reader = {
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
...@@ -93,15 +89,14 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -93,15 +89,14 @@ int32_t tqMetaOpen(STQ* pTq) {
.version = handle.snapshotVer, .version = handle.snapshotVer,
}; };
handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, handle.execHandle.execCol.task =
&handle.execHandle.pSchemaWrapper); qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.execCol.task[i]); ASSERT(handle.execHandle.execCol.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner); qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
ASSERT(scanner); ASSERT(scanner);
handle.execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader[i]); ASSERT(handle.execHandle.pExecReader);
}
} else { } else {
handle.execHandle.execDb.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
......
...@@ -394,10 +394,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -394,10 +394,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
if (pIter == NULL) break; if (pIter == NULL) break;
STqHandle* pExec = (STqHandle*)pIter; STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) { int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task, tbUidList, isAdd);
int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task[i], tbUidList, isAdd);
ASSERT(code == 0); ASSERT(code == 0);
}
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
if (!isAdd) { if (!isAdd) {
int32_t sz = taosArrayGetSize(tbUidList); int32_t sz = taosArrayGetSize(tbUidList);
......
...@@ -127,6 +127,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -127,6 +127,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
tqDebug("tq sink, convert block %d, rows: %d", i, rows);
int32_t dataLen = 0; int32_t dataLen = 0;
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
...@@ -178,11 +180,14 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -178,11 +180,14 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data; const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode; SVnode* pVnode = (SVnode*)vnode;
tqDebug("task write into table, vgId %d, block num: %d", pVnode->config.vgId, (int32_t)pRes->size); tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId); pTask->tbSink.stbFullName, pVnode->config.vgId);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/ /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
// build write msg // build write msg
SRpcMsg msg = { SRpcMsg msg = {
......
...@@ -316,7 +316,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -316,7 +316,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_VND_TABLE_CFG: case TDMT_VND_TABLE_CFG:
return vnodeGetTableCfg(pVnode, pMsg); return vnodeGetTableCfg(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg); return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH: case TDMT_STREAM_TASK_DISPATCH:
......
...@@ -229,8 +229,8 @@ typedef struct { ...@@ -229,8 +229,8 @@ typedef struct {
int8_t stop; int8_t stop;
} SAsyncPool; } SAsyncPool;
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool); void transAsyncPoolDestroy(SAsyncPool* pool);
int transAsyncSend(SAsyncPool* pool, queue* mq); int transAsyncSend(SAsyncPool* pool, queue* mq);
bool transAsyncPoolIsEmpty(SAsyncPool* pool); bool transAsyncPoolIsEmpty(SAsyncPool* pool);
...@@ -322,7 +322,7 @@ typedef struct STransReq { ...@@ -322,7 +322,7 @@ typedef struct STransReq {
} STransReq; } STransReq;
void transReqQueueInit(queue* q); void transReqQueueInit(queue* q);
void* transReqQueuePushReq(queue* q); void* transReqQueuePush(queue* q);
void* transReqQueueRemove(void* arg); void* transReqQueueRemove(void* arg);
void transReqQueueClear(queue* q); void transReqQueueClear(queue* q);
...@@ -395,7 +395,7 @@ typedef struct SDelayQueue { ...@@ -395,7 +395,7 @@ typedef struct SDelayQueue {
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
bool transEpSetIsEqual(SEpSet* a, SEpSet* b); bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
/* /*
......
...@@ -26,7 +26,7 @@ typedef struct SCliConn { ...@@ -26,7 +26,7 @@ typedef struct SCliConn {
SConnBuffer readBuf; SConnBuffer readBuf;
STransQueue cliMsgs; STransQueue cliMsgs;
queue conn; queue q;
uint64_t expireTime; uint64_t expireTime;
STransCtx ctx; STransCtx ctx;
...@@ -451,7 +451,7 @@ void cliTimeoutCb(uv_timer_t* handle) { ...@@ -451,7 +451,7 @@ void cliTimeoutCb(uv_timer_t* handle) {
while (p != NULL) { while (p != NULL) {
while (!QUEUE_IS_EMPTY(&p->conn)) { while (!QUEUE_IS_EMPTY(&p->conn)) {
queue* h = QUEUE_HEAD(&p->conn); queue* h = QUEUE_HEAD(&p->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn); SCliConn* c = QUEUE_DATA(h, SCliConn, q);
if (c->expireTime < currentTime) { if (c->expireTime < currentTime) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
transUnrefCliHandle(c); transUnrefCliHandle(c);
...@@ -475,7 +475,7 @@ void* destroyConnPool(void* pool) { ...@@ -475,7 +475,7 @@ void* destroyConnPool(void* pool) {
while (connList != NULL) { while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conn)) { while (!QUEUE_IS_EMPTY(&connList->conn)) {
queue* h = QUEUE_HEAD(&connList->conn); queue* h = QUEUE_HEAD(&connList->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn); SCliConn* c = QUEUE_DATA(h, SCliConn, q);
cliDestroyConn(c, true); cliDestroyConn(c, true);
} }
connList = taosHashIterate((SHashObj*)pool, connList); connList = taosHashIterate((SHashObj*)pool, connList);
...@@ -501,11 +501,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -501,11 +501,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
return NULL; return NULL;
} }
queue* h = QUEUE_HEAD(&plist->conn); queue* h = QUEUE_HEAD(&plist->conn);
SCliConn* conn = QUEUE_DATA(h, SCliConn, conn); SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal; conn->status = ConnNormal;
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->q);
assert(h == &conn->conn); assert(h == &conn->q);
return conn; return conn;
} }
static int32_t allocConnRef(SCliConn* conn, bool update) { static int32_t allocConnRef(SCliConn* conn, bool update) {
...@@ -560,8 +560,8 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -560,8 +560,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->q);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->q);
assert(!QUEUE_IS_EMPTY(&plist->conn)); assert(!QUEUE_IS_EMPTY(&plist->conn));
} }
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
...@@ -614,7 +614,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -614,7 +614,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
transReqQueueInit(&conn->wreqQueue); transReqQueueInit(&conn->wreqQueue);
transQueueInit(&conn->cliMsgs, NULL); transQueueInit(&conn->cliMsgs, NULL);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
conn->status = ConnNormal; conn->status = ConnNormal;
conn->broken = 0; conn->broken = 0;
...@@ -626,8 +626,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -626,8 +626,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
} }
static void cliDestroyConn(SCliConn* conn, bool clear) { static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->q);
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1; conn->refId = -1;
...@@ -735,7 +735,7 @@ void cliSend(SCliConn* pConn) { ...@@ -735,7 +735,7 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
} }
uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return; return;
_RETURN: _RETURN:
...@@ -990,7 +990,7 @@ static SCliThrd* createThrdObj() { ...@@ -990,7 +990,7 @@ static SCliThrd* createThrdObj() {
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop); uv_loop_init(pThrd->loop);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb);
uv_timer_init(pThrd->loop, &pThrd->timer); uv_timer_init(pThrd->loop, &pThrd->timer);
pThrd->timer.data = pThrd; pThrd->timer.data = pThrd;
...@@ -1009,7 +1009,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { ...@@ -1009,7 +1009,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
CLI_RELEASE_UV(pThrd->loop); CLI_RELEASE_UV(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx); taosThreadMutexDestroy(&pThrd->msgMtx);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
transDestroyAsyncPool(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
transDQDestroy(pThrd->delayQueue, destroyCmsg); transDQDestroy(pThrd->delayQueue, destroyCmsg);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
...@@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) { ...@@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) {
cliHandleReq(pMsg, pThrd); cliHandleReq(pMsg, pThrd);
} }
static void doCloseIdleConn(void* param) {
STaskArg* arg = param;
SCliConn* conn = arg->param1;
SCliThrd* pThrd = arg->param2;
}
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
...@@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { ...@@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
} }
} }
bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) { bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
if ((pResp == NULL || pResp->info.hasEpSet == 0)) { if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
return false; return false;
} }
...@@ -1116,6 +1122,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1116,6 +1122,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
*/ */
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code; int32_t code = pResp->code;
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false; bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
if (retry) { if (retry) {
pMsg->sent = 0; pMsg->sent = 0;
...@@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pCtx->retryCnt < pCtx->retryLimit) { if (pCtx->retryCnt < pCtx->retryLimit) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
EPSET_FORWARD_INUSE(&pCtx->epSet); EPSET_FORWARD_INUSE(&pCtx->epSet);
transFreeMsg(pResp->pCont);
cliSchedMsgToNextNode(pMsg, pThrd); cliSchedMsgToNextNode(pMsg, pThrd);
return -1; return -1;
} }
...@@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STraceId* trace = &pResp->info.traceId; STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryToExtractEpSet(pResp, &pCtx->epSet); bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
if (hasEpSet) { if (hasEpSet) {
char tbuf[256] = {0}; char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf); EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
...@@ -1336,19 +1344,18 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs ...@@ -1336,19 +1344,18 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
if (0 != transAsyncSend(pThrd->asyncPool, &cliMsg->q)) { int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
tsem_destroy(sem); if (ret != 0) {
taosMemoryFree(sem);
destroyCmsg(cliMsg); destroyCmsg(cliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); goto _RETURN;
return -1;
} }
tsem_wait(sem); tsem_wait(sem);
_RETURN:
tsem_destroy(sem); tsem_destroy(sem);
taosMemoryFree(sem); taosMemoryFree(sem);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0; return ret;
} }
/* /*
* *
......
...@@ -175,7 +175,7 @@ int transSetConnOption(uv_tcp_t* stream) { ...@@ -175,7 +175,7 @@ int transSetConnOption(uv_tcp_t* stream) {
return ret; return ret;
} }
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
pool->nAsync = sz; pool->nAsync = sz;
pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
...@@ -194,7 +194,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) ...@@ -194,7 +194,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
return pool; return pool;
} }
void transDestroyAsyncPool(SAsyncPool* pool) { void transAsyncPoolDestroy(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) { for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]); uv_async_t* async = &(pool->asyncs[i]);
// uv_close((uv_handle_t*)async, NULL); // uv_close((uv_handle_t*)async, NULL);
...@@ -205,6 +205,14 @@ void transDestroyAsyncPool(SAsyncPool* pool) { ...@@ -205,6 +205,14 @@ void transDestroyAsyncPool(SAsyncPool* pool) {
taosMemoryFree(pool->asyncs); taosMemoryFree(pool->asyncs);
taosMemoryFree(pool); taosMemoryFree(pool);
} }
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
}
return true;
}
int transAsyncSend(SAsyncPool* pool, queue* q) { int transAsyncSend(SAsyncPool* pool, queue* q) {
if (atomic_load_8(&pool->stop) == 1) { if (atomic_load_8(&pool->stop) == 1) {
return -1; return -1;
...@@ -228,14 +236,6 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { ...@@ -228,14 +236,6 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
} }
return uv_async_send(async); return uv_async_send(async);
} }
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
}
return true;
}
void transCtxInit(STransCtx* ctx) { void transCtxInit(STransCtx* ctx) {
// init transCtx // init transCtx
...@@ -308,7 +308,7 @@ void transReqQueueInit(queue* q) { ...@@ -308,7 +308,7 @@ void transReqQueueInit(queue* q) {
// init req queue // init req queue
QUEUE_INIT(q); QUEUE_INIT(q);
} }
void* transReqQueuePushReq(queue* q) { void* transReqQueuePush(queue* q) {
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq)); STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq));
wreq->data = req; wreq->data = req;
...@@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { ...@@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
heapDestroy(queue->heap); heapDestroy(queue->heap);
taosMemoryFree(queue); taosMemoryFree(queue);
} }
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
uv_timer_stop(queue->timer);
if (heapSize(queue->heap) <= 0) return;
heapRemove(queue->heap, &task->node);
if (heapSize(queue->heap) != 0) {
HeapNode* minNode = heapMin(queue->heap);
if (minNode != NULL) return;
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { uint64_t now = taosGetTimestampMs();
SDelayTask* task = container_of(minNode, SDelayTask, node);
uint64_t timeout = now > task->execTime ? now - task->execTime : 0;
uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
}
}
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
uint64_t now = taosGetTimestampMs(); uint64_t now = taosGetTimestampMs();
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
task->func = func; task->func = func;
...@@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_ ...@@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_
tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs); tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
heapInsert(queue->heap, &task->node); heapInsert(queue->heap, &task->node);
uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0); uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
return 0; return task;
} }
void transPrintEpSet(SEpSet* pEpSet) { void transPrintEpSet(SEpSet* pEpSet) {
......
...@@ -434,7 +434,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) { ...@@ -434,7 +434,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
uvPrepareSendData(smsg, &wb); uvPrepareSendData(smsg, &wb);
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
} }
static void uvStartSendResp(SSvrMsg* smsg) { static void uvStartSendResp(SSvrMsg* smsg) {
...@@ -697,7 +697,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { ...@@ -697,7 +697,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
// conn set // conn set
QUEUE_INIT(&pThrd->conn); QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 1, pThrd, uvWorkerAsyncCb); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 1, pThrd, uvWorkerAsyncCb);
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true; return true;
...@@ -976,7 +976,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { ...@@ -976,7 +976,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
taosThreadJoin(pThrd->thread, NULL); taosThreadJoin(pThrd->thread, NULL);
SRV_RELEASE_UV(pThrd->loop); SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
transDestroyAsyncPool(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
......
...@@ -809,6 +809,8 @@ class StateEmpty(AnyState): ...@@ -809,6 +809,8 @@ class StateEmpty(AnyState):
] ]
def verifyTasksToState(self, tasks, newState): def verifyTasksToState(self, tasks, newState):
if Config.getConfig().ignore_errors: # if we are asked to ignore certain errors, let's not verify CreateDB success.
return
if (self.hasSuccess(tasks, TaskCreateDb) if (self.hasSuccess(tasks, TaskCreateDb)
): # at EMPTY, if there's succes in creating DB ): # at EMPTY, if there's succes in creating DB
if (not self.hasTask(tasks, TaskDropDb)): # and no drop_db tasks if (not self.hasTask(tasks, TaskDropDb)): # and no drop_db tasks
...@@ -2491,7 +2493,7 @@ class MainExec: ...@@ -2491,7 +2493,7 @@ class MainExec:
action='store', action='store',
default=None, default=None,
type=str, type=str,
help='Ignore error codes, comma separated, 0x supported (default: None)') help='Ignore error codes, comma separated, 0x supported, also suppresses certain transition state checks. (default: None)')
parser.add_argument( parser.add_argument(
'-i', '-i',
'--num-replicas', '--num-replicas',
......
...@@ -28,6 +28,7 @@ from util.common import * ...@@ -28,6 +28,7 @@ from util.common import *
from util.constant import * from util.constant import *
from dataclasses import dataclass,field from dataclasses import dataclass,field
from typing import List from typing import List
from datetime import datetime
@dataclass @dataclass
class DataSet: class DataSet:
......
...@@ -84,6 +84,7 @@ echo "SIM_DIR : $SIM_DIR" ...@@ -84,6 +84,7 @@ echo "SIM_DIR : $SIM_DIR"
echo "CODE_DIR : $CODE_DIR" echo "CODE_DIR : $CODE_DIR"
echo "CFG_DIR : $CFG_DIR" echo "CFG_DIR : $CFG_DIR"
rm -rf $SIM_DIR/*
rm -rf $LOG_DIR rm -rf $LOG_DIR
rm -rf $CFG_DIR rm -rf $CFG_DIR
......
...@@ -214,6 +214,24 @@ class TDTestCase: ...@@ -214,6 +214,24 @@ class TDTestCase:
tdSql.checkRows((row_num-i)*tb_num) tdSql.checkRows((row_num-i)*tb_num)
for j in range(tb_num): for j in range(tb_num):
self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data) self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data)
for i in range(row_num):
tdSql.execute(f'delete from {tbname} where ts between {self.ts} and {self.ts+i}')
tdSql.execute(f'flush database {dbname}')
tdSql.execute('reset query cache')
tdSql.query(f'select {col_name} from {tbname}')
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(row_num - i-1)
self.insert_base_data(col_type,tbname,row_num,base_data)
elif tb_type == 'stb':
tdSql.checkRows(tb_num*(row_num - i-1))
for j in range(tb_num):
self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data)
tdSql.execute(f'delete from {tbname} where ts between {self.ts+i+1} and {self.ts}')
tdSql.query(f'select {col_name} from {tbname}')
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(row_num)
elif tb_type == 'stb':
tdSql.checkRows(tb_num*row_num)
def delete_error(self,tbname,column_name,column_type,base_data): def delete_error(self,tbname,column_name,column_type,base_data):
for error_list in ['',f'ts = {self.ts} and',f'ts = {self.ts} or']: for error_list in ['',f'ts = {self.ts} and',f'ts = {self.ts} or']:
if 'binary' in column_type.lower(): if 'binary' in column_type.lower():
...@@ -222,6 +240,7 @@ class TDTestCase: ...@@ -222,6 +240,7 @@ class TDTestCase:
tdSql.error(f'''delete from {tbname} where {error_list} {column_name} ="{base_data['nchar']}"''') tdSql.error(f'''delete from {tbname} where {error_list} {column_name} ="{base_data['nchar']}"''')
else: else:
tdSql.error(f'delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}') tdSql.error(f'delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}')
def delete_data_ntb(self): def delete_data_ntb(self):
tdSql.execute(f'create database if not exists {self.dbname}') tdSql.execute(f'create database if not exists {self.dbname}')
tdSql.execute(f'use {self.dbname}') tdSql.execute(f'use {self.dbname}')
......
...@@ -81,39 +81,63 @@ class TDTestCase: ...@@ -81,39 +81,63 @@ class TDTestCase:
if col_type.lower() == 'double': if col_type.lower() == 'double':
for error_value in [tdCom.getLongName(self.str_length),True,False,1.1*constant.DOUBLE_MIN,1.1*constant.DOUBLE_MAX]: for error_value in [tdCom.getLongName(self.str_length),True,False,1.1*constant.DOUBLE_MIN,1.1*constant.DOUBLE_MAX]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'float': elif col_type.lower() == 'float':
for error_value in [tdCom.getLongName(self.str_length),True,False,1.1*constant.FLOAT_MIN,1.1*constant.FLOAT_MAX]: for error_value in [tdCom.getLongName(self.str_length),True,False,1.1*constant.FLOAT_MIN,1.1*constant.FLOAT_MAX]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif 'binary' in col_type.lower() or 'nchar' in col_type.lower(): elif 'binary' in col_type.lower() or 'nchar' in col_type.lower():
for error_value in [tdCom.getLongName(str_length)]: for error_value in [tdCom.getLongName(str_length)]:
tdSql.error(f'insert into {tbname} values({self.ts},"{error_value}")') tdSql.error(f'insert into {tbname} values({self.ts},"{error_value}")')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'bool': elif col_type.lower() == 'bool':
for error_value in [tdCom.getLongName(self.str_length)]: for error_value in [tdCom.getLongName(self.str_length)]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'tinyint': elif col_type.lower() == 'tinyint':
for error_value in [constant.TINYINT_MIN-1,constant.TINYINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.TINYINT_MIN-1,constant.TINYINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'smallint': elif col_type.lower() == 'smallint':
for error_value in [constant.SMALLINT_MIN-1,constant.SMALLINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.SMALLINT_MIN-1,constant.SMALLINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'int': elif col_type.lower() == 'int':
for error_value in [constant.INT_MIN-1,constant.INT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.INT_MIN-1,constant.INT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'bigint': elif col_type.lower() == 'bigint':
for error_value in [constant.BIGINT_MIN-1,constant.BIGINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.BIGINT_MIN-1,constant.BIGINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'tinyint unsigned': elif col_type.lower() == 'tinyint unsigned':
for error_value in [constant.TINYINT_UN_MIN-1,constant.TINYINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.TINYINT_UN_MIN-1,constant.TINYINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'smallint unsigned': elif col_type.lower() == 'smallint unsigned':
for error_value in [constant.SMALLINT_UN_MIN-1,constant.SMALLINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.SMALLINT_UN_MIN-1,constant.SMALLINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'int unsigned': elif col_type.lower() == 'int unsigned':
for error_value in [constant.INT_UN_MIN-1,constant.INT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.INT_UN_MIN-1,constant.INT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
elif col_type.lower() == 'bigint unsigned': elif col_type.lower() == 'bigint unsigned':
for error_value in [constant.BIGINT_UN_MIN-1,constant.BIGINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: for error_value in [constant.BIGINT_UN_MIN-1,constant.BIGINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]:
tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') tdSql.error(f'insert into {tbname} values({self.ts},{error_value})')
if tb_type == 'ctb':
tdSql.error(f'insert into {stbname} values({self.ts},{error_value})')
tdSql.execute(f'drop table {tbname}') tdSql.execute(f'drop table {tbname}')
if tb_type == 'ctb': if tb_type == 'ctb':
tdSql.execute(f'drop table {stbname}') tdSql.execute(f'drop table {stbname}')
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import random
import string
from numpy import logspace
from util import constant
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.dbname = 'db_test'
self.ntbname = 'ntb'
self.stbname = 'stb'
self.rowNum = 10
self.tbnum = 5
self.ts = 1537146000000
self.str_length = 20
self.column_dict = {
'col1': 'tinyint',
'col2': 'smallint',
'col3': 'int',
'col4': 'bigint',
'col5': 'tinyint unsigned',
'col6': 'smallint unsigned',
'col7': 'int unsigned',
'col8': 'bigint unsigned',
'col9': 'float',
'col10': 'double',
'col11': 'bool',
'col12': f'binary({self.str_length})',
'col13': f'nchar({self.str_length})'
}
self.tinyint_val = random.randint(constant.TINYINT_MIN,constant.TINYINT_MAX)
self.smallint_val = random.randint(constant.SMALLINT_MIN,constant.SMALLINT_MAX)
self.int_val = random.randint(constant.INT_MIN,constant.INT_MAX)
self.bigint_val = random.randint(constant.BIGINT_MIN,constant.BIGINT_MAX)
self.untingint_val = random.randint(constant.TINYINT_UN_MIN,constant.TINYINT_UN_MAX)
self.unsmallint_val = random.randint(constant.SMALLINT_UN_MIN,constant.SMALLINT_UN_MAX)
self.unint_val = random.randint(constant.INT_UN_MIN,constant.INT_MAX)
self.unbigint_val = random.randint(constant.BIGINT_UN_MIN,constant.BIGINT_UN_MAX)
self.float_val = random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX)
self.double_val = random.uniform(constant.DOUBLE_MIN*(1E-300),constant.DOUBLE_MAX*(1E-300))
self.bool_val = random.randint(0,2)%2
self.binary_val = tdCom.getLongName(random.randint(0,self.str_length))
self.nchar_val = tdCom.getLongName(random.randint(0,self.str_length))
self.data = {
'tinyint':self.tinyint_val,
'smallint':self.smallint_val,
'int':self.int_val,
'bigint':self.bigint_val,
'tinyint unsigned':self.untingint_val,
'smallint unsigned':self.unsmallint_val,
'int unsigned':self.unint_val,
'bigint unsigned':self.unbigint_val,
'bool':self.bool_val,
'float':self.float_val,
'double':self.double_val,
'binary':self.binary_val,
'nchar':self.nchar_val
}
def update_data(self,dbname,tbname,tb_num,rows,values,col_type):
sql = f'insert into '
for j in range(tb_num):
sql += f'{dbname}.{tbname}_{j} values'
for i in range(rows):
if 'binary' in col_type.lower() or 'nchar' in col_type.lower():
sql += f'({self.ts+i},"{values}")'
else:
sql += f'({self.ts+i},{values})'
sql += ' '
tdSql.execute(sql)
def insert_data(self,col_type,tbname,rows,data):
for i in range(rows):
if col_type.lower() == 'tinyint':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["tinyint"]})')
elif col_type.lower() == 'smallint':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["smallint"]})')
elif col_type.lower() == 'int':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["int"]})')
elif col_type.lower() == 'bigint':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["bigint"]})')
elif col_type.lower() == 'tinyint unsigned':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["tinyint unsigned"]})')
elif col_type.lower() == 'smallint unsigned':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["smallint unsigned"]})')
elif col_type.lower() == 'int unsigned':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["int unsigned"]})')
elif col_type.lower() == 'bigint unsigned':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["bigint unsigned"]})')
elif col_type.lower() == 'bool':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["bool"]})')
elif col_type.lower() == 'float':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["float"]})')
elif col_type.lower() == 'double':
tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["double"]})')
elif 'binary' in col_type.lower():
tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{data['binary']}")''')
elif 'nchar' in col_type.lower():
tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{data['nchar']}")''')
def data_check(self,dbname,tbname,tbnum,rownum,data,col_name,col_type):
if 'binary' in col_type.lower():
self.update_data(dbname,f'{tbname}',tbnum,rownum,data['binary'],col_type)
elif 'nchar' in col_type.lower():
self.update_data(dbname,f'{tbname}',tbnum,rownum,data['nchar'],col_type)
else:
self.update_data(dbname,f'{tbname}',tbnum,rownum,data[col_type],col_type)
tdSql.execute(f'flush database {dbname}')
tdSql.execute('reset query cache')
for i in range(self.tbnum):
tdSql.query(f'select {col_name} from {dbname}.{tbname}_{i}')
for j in range(rownum):
if col_type.lower() == 'float' or col_type.lower() == 'double':
if abs(tdSql.queryResult[j][0] - data[col_type]) / data[col_type] <= 0.0001:
tdSql.checkEqual(tdSql.queryResult[j][0],tdSql.queryResult[j][0])
elif 'binary' in col_type.lower():
tdSql.checkEqual(tdSql.queryResult[j][0],data['binary'])
elif 'nchar' in col_type.lower():
tdSql.checkEqual(tdSql.queryResult[j][0],data['nchar'])
else:
tdSql.checkEqual(tdSql.queryResult[j][0],data[col_type])
def update_data_ntb(self):
tdSql.execute(f'drop database if exists {self.dbname}')
tdSql.execute(f'create database {self.dbname}')
tdSql.execute(f'use {self.dbname}')
for col_name,col_type in self.column_dict.items():
for i in range(self.tbnum):
tdSql.execute(f'create table {self.dbname}.{self.ntbname}_{i} (ts timestamp,{col_name} {col_type})')
for j in range(self.rowNum):
tdSql.execute(f'insert into {self.dbname}.{self.ntbname}_{i} values({self.ts+j},null)' )
tdSql.execute(f'flush database {self.dbname}')
tdSql.execute('reset query cache')
self.data_check(self.dbname,self.ntbname,self.tbnum,self.rowNum,self.data,col_name,col_type)
for i in range(self.tbnum):
tdSql.execute(f'drop table {self.ntbname}_{i}')
def update_data_ctb(self):
tdSql.execute(f'drop database if exists {self.dbname}')
tdSql.execute(f'create database {self.dbname}')
tdSql.execute(f'use {self.dbname}')
for col_name,col_type in self.column_dict.items():
tdSql.execute(f'create table {self.dbname}.{self.stbname} (ts timestamp,{col_name} {col_type}) tags(t0 int)')
for i in range(self.tbnum):
tdSql.execute(f'create table {self.dbname}.{self.stbname}_{i} using {self.dbname}.{self.stbname} tags(1)')
for j in range(self.rowNum):
tdSql.execute(f'insert into {self.dbname}.{self.stbname}_{i} values({self.ts+j},null)' )
tdSql.execute(f'flush database {self.dbname}')
tdSql.execute('reset query cache')
self.data_check(self.dbname,self.stbname,self.tbnum,self.rowNum,self.data,col_name,col_type)
tdSql.execute(f'drop table {self.stbname}')
def run(self):
self.update_data_ntb()
self.update_data_ctb()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
...@@ -94,17 +94,15 @@ class TDTestCase: ...@@ -94,17 +94,15 @@ class TDTestCase:
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})') tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})')
#!TODO tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})')
# tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})') tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
# tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
tdSql.query(f'select count(tbname) from {self.stbname}') tdSql.query(f'select count(tbname) from {self.stbname}')
tdSql.checkRows(0) tdSql.checkRows(0)
tdSql.execute('flush database db') tdSql.execute('flush database db')
tdSql.query(f'select count(tbname) from {self.stbname}') tdSql.query(f'select count(tbname) from {self.stbname}')
tdSql.checkRows(0) tdSql.checkRows(0)
#!TODO tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})')
# tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})') tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
# tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
for i in range(self.tbnum): for i in range(self.tbnum):
self.insert_data(self.column_dict,f'{self.stbname}_{i}',self.rowNum) self.insert_data(self.column_dict,f'{self.stbname}_{i}',self.rowNum)
self.count_query_stb(self.column_dict,self.tag_dict,self.stbname,self.tbnum,self.rowNum) self.count_query_stb(self.column_dict,self.tag_dict,self.stbname,self.tbnum,self.rowNum)
......
...@@ -20,6 +20,8 @@ import threading ...@@ -20,6 +20,8 @@ import threading
import requests import requests
import time import time
# import socketfrom # import socketfrom
import json
import toml
import taos import taos
from util.log import * from util.log import *
...@@ -207,7 +209,7 @@ class TMQCom: ...@@ -207,7 +209,7 @@ class TMQCom:
def drop_ctable(self, tsql, dbname=None, count=1, default_ctbname_prefix="ctb",ctbStartIdx=0): def drop_ctable(self, tsql, dbname=None, count=1, default_ctbname_prefix="ctb",ctbStartIdx=0):
for _ in range(count): for _ in range(count):
create_ctable_sql = f'drop table {dbname}.{default_ctbname_prefix}{ctbStartIdx};' create_ctable_sql = f'drop table if exists {dbname}.{default_ctbname_prefix}{ctbStartIdx};'
ctbStartIdx += 1 ctbStartIdx += 1
tdLog.info("drop ctb sql: %s"%create_ctable_sql) tdLog.info("drop ctb sql: %s"%create_ctable_sql)
tsql.execute(create_ctable_sql) tsql.execute(create_ctable_sql)
...@@ -503,6 +505,37 @@ class TMQCom: ...@@ -503,6 +505,37 @@ class TMQCom:
break break
return return
def create_ntable(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=None, colPrefix='c', tblNum=1, **kwargs):
tb_params = ""
if len(kwargs) > 0:
for param, value in kwargs.items():
tb_params += f'{param} "{value}" '
column_type_str = tdCom.gen_column_type_str(colPrefix, column_elm_list)
for _ in range(tblNum):
create_table_sql = f'create table {dbname}.{tbname_prefix}{tbname_index_start_num} ({column_type_str}) {tb_params};'
tbname_index_start_num += 1
tsql.execute(create_table_sql)
def insert_rows_into_ntbl(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=None, startTs=None, tblNum=1, rows=1):
if startTs is None:
startTs = tdCom.genTs()[0]
for tblIdx in range(tblNum):
for rowIdx in range(rows):
column_value_list = tdCom.gen_column_value_list(column_ele_list, f'{startTs}+{rowIdx}s')
column_value_str = ''
idx = 0
for column_value in column_value_list:
if isinstance(column_value, str) and idx != 0:
column_value_str += f'"{column_value}", '
else:
column_value_str += f'{column_value}, '
idx += 1
column_value_str = column_value_str.rstrip()[:-1]
insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});'
tsql.execute(insert_sql)
def close(self): def close(self):
self.cursor.close() self.cursor.close()
......
import taos
import sys
import time
import socket
import os
import threading
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.snapshot = 0
self.vgroups = 4
self.ctbNum = 100
self.rowsPerTbl = 10
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def waitSubscriptionExit(self, max_wait_count=20):
wait_cnt = 0
while (wait_cnt < max_wait_count):
tdSql.query("show subscriptions")
if tdSql.getRows() == 0:
break
else:
time.sleep(1)
wait_cnt += 1
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
# drop some ntbs
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ntb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0,
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdLog.info("start create database....")
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("start create normal tables....")
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("create topics from database")
topicFromDb = 'topic_dbt'
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
if self.snapshot == 0:
consumerId = 0
elif self.snapshot == 1:
consumerId = 1
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
topicList = topicFromDb
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("drop some ntables")
# drop 1/4 ctbls from half offset
paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
tdLog.printNoPrefix("======== test case 1 end ...... ")
# drop some ntbs and create some new ntbs
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ntb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0,
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdLog.info("start create database....")
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("start create normal tables....")
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("create topics from database")
topicFromDb = 'topic_dbt'
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
if self.snapshot == 0:
consumerId = 2
elif self.snapshot == 1:
consumerId = 3
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
topicList = topicFromDb
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("drop some ntables")
# drop 1/4 ctbls from half offset
paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
tdLog.info("start create some new normal tables....")
paraDict["ctbPrefix"] = 'newCtb'
paraDict["ctbNum"] = self.ctbNum
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into these new normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.snapshot = 0
# self.tmqCase1()
self.tmqCase2()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
# self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
...@@ -32,6 +32,9 @@ python3 ./test.py -f 1-insert/block_wise.py ...@@ -32,6 +32,9 @@ python3 ./test.py -f 1-insert/block_wise.py
python3 ./test.py -f 1-insert/create_retentions.py python3 ./test.py -f 1-insert/create_retentions.py
python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 1-insert/table_param_ttl.py
python3 ./test.py -f 1-insert/update_data_muti_rows.py
python3 ./test.py -f 2-query/abs.py python3 ./test.py -f 2-query/abs.py
python3 ./test.py -f 2-query/abs.py -R python3 ./test.py -f 2-query/abs.py -R
python3 ./test.py -f 2-query/and_or_for_byte.py python3 ./test.py -f 2-query/and_or_for_byte.py
...@@ -59,7 +62,9 @@ python3 ./test.py -f 2-query/char_length.py -R ...@@ -59,7 +62,9 @@ python3 ./test.py -f 2-query/char_length.py -R
python3 ./test.py -f 2-query/check_tsdb.py python3 ./test.py -f 2-query/check_tsdb.py
python3 ./test.py -f 2-query/check_tsdb.py -R python3 ./test.py -f 2-query/check_tsdb.py -R
# jira python3 ./test.py -f 1-insert/update_data.py
python3 ./test.py -f 1-insert/update_data.py
python3 ./test.py -f 1-insert/delete_data.py python3 ./test.py -f 1-insert/delete_data.py
python3 ./test.py -f 2-query/db.py python3 ./test.py -f 2-query/db.py
......
Subproject commit 0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a Subproject commit 69b558ccbfe54a4407fe23eeae2e67c540f59e55
Subproject commit df8678f070e3f707faf59baebec90065f6e1268b Subproject commit d8f19ede56f1f489c5d2ac8f963cced01e68ecef
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册