提交 e39a5c3f 编写于 作者: S Shengliang Guan

Merge branch '3.0' into fix/coverity_alex

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "3.0.1.7") SET(TD_VER_NUMBER "3.0.1.8")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)
......
...@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w ...@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 3.0.1.8
<Release type="tdengine" version="3.0.1.8" />
## 3.0.1.7 ## 3.0.1.7
<Release type="tdengine" version="3.0.1.7" /> <Release type="tdengine" version="3.0.1.7" />
......
...@@ -10,6 +10,10 @@ For other historical version installers, please visit [here](https://www.taosdat ...@@ -10,6 +10,10 @@ For other historical version installers, please visit [here](https://www.taosdat
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 2.3.0
<Release type="tools" version="2.3.0" />
## 2.2.9 ## 2.2.9
<Release type="tools" version="2.2.9" /> <Release type="tools" version="2.2.9" />
......
...@@ -72,7 +72,7 @@ SHOW STREAMS; ...@@ -72,7 +72,7 @@ SHOW STREAMS;
若要展示更详细的信息,可以使用: 若要展示更详细的信息,可以使用:
```sql ```sql
SELECT * from performance_schema.`perf_streams`; SELECT * from information_schema.`ins_streams`;
``` ```
## 流式计算的触发模式 ## 流式计算的触发模式
......
...@@ -77,7 +77,7 @@ sudo -u grafana grafana-cli plugins install tdengine-datasource ...@@ -77,7 +77,7 @@ sudo -u grafana grafana-cli plugins install tdengine-datasource
或者从 [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) 或 [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) 下载 .zip 文件到本地并解压到 Grafana 插件目录。命令行下载示例如下: 或者从 [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) 或 [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) 下载 .zip 文件到本地并解压到 Grafana 插件目录。命令行下载示例如下:
```bash ```bash
GF_VERSION=3.2.2 GF_VERSION=3.2.7
# from GitHub # from GitHub
wget https://github.com/taosdata/grafanaplugin/releases/download/v$GF_VERSION/tdengine-datasource-$GF_VERSION.zip wget https://github.com/taosdata/grafanaplugin/releases/download/v$GF_VERSION/tdengine-datasource-$GF_VERSION.zip
# from Grafana # from Grafana
......
...@@ -10,6 +10,11 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do ...@@ -10,6 +10,11 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 3.0.1.8
<Release type="tdengine" version="3.0.1.8" />
## 3.0.1.7 ## 3.0.1.7
<Release type="tdengine" version="3.0.1.7" /> <Release type="tdengine" version="3.0.1.7" />
......
...@@ -10,6 +10,10 @@ taosTools 各版本安装包下载链接如下: ...@@ -10,6 +10,10 @@ taosTools 各版本安装包下载链接如下:
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 2.3.0
<Release type="tools" version="2.3.0" />
## 2.2.9 ## 2.2.9
<Release type="tools" version="2.2.9" /> <Release type="tools" version="2.2.9" />
......
...@@ -140,15 +140,40 @@ typedef struct { ...@@ -140,15 +140,40 @@ typedef struct {
int8_t type; int8_t type;
} SStreamCheckpoint; } SStreamCheckpoint;
typedef struct {
int8_t type;
} SStreamTaskDestroy;
typedef struct { typedef struct {
int8_t type; int8_t type;
SSDataBlock* pBlock; SSDataBlock* pBlock;
} SStreamTrigger; } SStreamTrigger;
typedef struct SStreamQueueNode SStreamQueueNode;
struct SStreamQueueNode {
SStreamQueueItem* item;
SStreamQueueNode* next;
};
typedef struct {
SStreamQueueNode* head;
int64_t size;
} SStreamQueueRes;
void streamFreeQitem(SStreamQueueItem* data);
bool streamQueueResEmpty(const SStreamQueueRes* pRes);
int64_t streamQueueResSize(const SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes);
void streamQueueResClear(SStreamQueueRes* pRes);
SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pNode);
typedef struct {
SStreamQueueNode* pHead;
} SStreamQueue1;
bool streamQueueHasTask(const SStreamQueue1* pQueue);
int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem);
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue);
typedef struct { typedef struct {
STaosQueue* queue; STaosQueue* queue;
STaosQall* qall; STaosQall* qall;
......
...@@ -324,15 +324,15 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -324,15 +324,15 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
} }
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqHbReq req = {0}; SMqHbReq req = {0};
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
int64_t consumerId = req.consumerId; int64_t consumerId = req.consumerId;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
mError("consumer %" PRId64 " not exist", consumerId); mError("consumer %" PRId64 " not exist", consumerId);
...@@ -363,17 +363,17 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { ...@@ -363,17 +363,17 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
} }
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqAskEpReq req = {0}; SMqAskEpReq req = {0};
SMqAskEpRsp rsp = {0}; SMqAskEpRsp rsp = {0};
if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) { if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
int64_t consumerId = req.consumerId; int64_t consumerId = req.consumerId;
int32_t epoch = req.epoch; int32_t epoch = req.epoch;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
...@@ -457,6 +457,8 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -457,6 +457,8 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
if (topicEp.vgs == NULL) { if (topicEp.vgs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
goto FAIL; goto FAIL;
} }
......
...@@ -317,9 +317,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -317,9 +317,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDbObj != NULL); ASSERT(pDbObj != NULL);
sdbRelease(pSdb, pDbObj);
bool multiTarget = pDbObj->cfg.numOfVgroups > 1; bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
sdbRelease(pSdb, pDbObj);
if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
/*if (true) {*/ /*if (true) {*/
...@@ -451,7 +451,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -451,7 +451,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) { if (pEpInfo == NULL) {
ASSERT(0);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
......
...@@ -782,6 +782,7 @@ SUB_DECODE_OVER: ...@@ -782,6 +782,7 @@ SUB_DECODE_OVER:
return NULL; return NULL;
} }
mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
return pRow; return pRow;
} }
...@@ -928,6 +929,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) ...@@ -928,6 +929,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
action.msgType = TDMT_VND_TMQ_DELETE_SUB; action.msgType = TDMT_VND_TMQ_DELETE_SUB;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
sdbRelease(pSdb, pSub);
return -1; return -1;
} }
} }
...@@ -936,6 +938,8 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) ...@@ -936,6 +938,8 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
sdbRelease(pSdb, pSub); sdbRelease(pSdb, pSub);
goto END; goto END;
} }
sdbRelease(pSdb, pSub);
} }
code = 0; code = 0;
......
...@@ -660,6 +660,13 @@ _end: ...@@ -660,6 +660,13 @@ _end:
return code; return code;
} }
static void tdBlockDataDestroy(SArray *pBlockArr) {
for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) {
blockDataDestroy(taosArrayGetP(pBlockArr, i));
}
taosArrayDestroy(pBlockArr);
}
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid) { int64_t suid) {
SArray *pResList = taosArrayInit(1, POINTER_BYTES); SArray *pResList = taosArrayInit(1, POINTER_BYTES);
...@@ -701,38 +708,42 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ...@@ -701,38 +708,42 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
#endif #endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
SSDataBlock *output = taosArrayGetP(pResList, i); SSDataBlock *output = taosArrayGetP(pResList, i);
smaDebug("result block, uid:%"PRIu64", groupid:%"PRIu64", rows:%d", output->info.uid, output->info.groupId, smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.uid, output->info.groupId,
output->info.rows); output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq *pReq = NULL; SSubmitReq *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965) // TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%"PRIu64", level %" PRIi8 " failed since %s", SMA_VID(pSma), smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%" PRIu64 ", level %" PRIi8
suid, output->info.groupId, pItem->level, terrstr()); " failed since %s",
SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr());
goto _err; goto _err;
} }
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma suid:%"PRIu64", uid:%" PRIu64 " level %" PRIi8 " failed since %s", smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8
" failed since %s",
SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr()); SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr());
goto _err; goto _err;
} }
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%"PRIu64", level %" PRIi8 " ver %" PRIi64 " len %" PRIu32, smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64
SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version, htonl(pReq->header.contLen)); " len %" PRIu32,
SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version,
htonl(pReq->header.contLen));
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
} }
} }
taosArrayDestroy(pResList); tdBlockDataDestroy(pResList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
taosArrayDestroy(pResList); tdBlockDataDestroy(pResList);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -820,8 +831,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { ...@@ -820,8 +831,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
ERsmaExecType type, int8_t level) { ERsmaExecType type, int8_t level) {
int32_t idx = level - 1; int32_t idx = level - 1;
void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx);
void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx);
if (!qTaskInfo) { if (!qTaskInfo) {
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
pInfo->suid); pInfo->suid);
......
...@@ -379,7 +379,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m ...@@ -379,7 +379,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) { if (pHandle) {
if (walRefVer(pHandle->pRef, offset.val.version) < 0) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
ASSERT(0);
return -1; return -1;
} }
} }
......
...@@ -82,12 +82,17 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* ...@@ -82,12 +82,17 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock*
static void releaseQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables);
static void destroyFillOperatorInfo(void* param); static void destroyFillOperatorInfo(void* param);
static void destroyProjectOperatorInfo(void* param); static void destroyAggOperatorInfo(void* param);
static void destroySortOperatorInfo(void* param); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void destroyAggOperatorInfo(void* param); static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static void destroyIntervalOperatorInfo(void* param); static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey);
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol);
void setOperatorCompleted(SOperatorInfo* pOperator) { void setOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -129,9 +134,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, ...@@ -129,9 +134,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) { SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
SFilePage* pData = NULL; SFilePage* pData = NULL;
...@@ -362,9 +364,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfo ...@@ -362,9 +364,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfo
} }
} }
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol);
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) { static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
SqlFunctionCtx* pCtx = pExprSup->pCtx; SqlFunctionCtx* pCtx = pExprSup->pCtx;
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
...@@ -996,9 +995,6 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO ...@@ -996,9 +995,6 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
} }
} }
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
int32_t status);
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) { void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
if (pFilterInfo == NULL || pBlock->info.rows == 0) { if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return; return;
...@@ -1564,9 +1560,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t ...@@ -1564,9 +1560,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
// todo add more information about exchange operation // todo add more information about exchange operation
int32_t type = pOperator->operatorType; int32_t type = pOperator->operatorType;
...@@ -1641,11 +1634,16 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -1641,11 +1634,16 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
} }
// the downstream operator may return with error code, so let's check the code before generating results.
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
return TSDB_CODE_SUCCESS; return pTaskInfo->code;
} }
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
...@@ -1684,7 +1682,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { ...@@ -1684,7 +1682,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pInfo->pRes; return (rows == 0) ? NULL : pInfo->pRes;
} }
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
......
...@@ -915,33 +915,39 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { ...@@ -915,33 +915,39 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
} }
pDest->info.rows++; pDest->info.rows++;
if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) { if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); void* tbname = NULL;
SSDataBlock* pResBlock = createDataBlock(); if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) {
pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); tdbFree(tbname);
taosArrayPush(pResBlock->pDataBlock, &data);
blockDataEnsureCapacity(pResBlock, 1);
projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL);
ASSERT(pResBlock->info.rows == 1);
ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
void* pData = colDataGetVarData(pCol, 0);
// TODO check tbname validity
if (pData != (void*)-1) {
memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(pDest->info.parTbName, varDataVal(pData), len);
/*pDest->info.parTbName[len + 1] = 0;*/
} else { } else {
pDest->info.parTbName[0] = 0; SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex);
} SSDataBlock* pResBlock = createDataBlock();
if (pParInfo->groupId && pDest->info.parTbName[0]) { pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN;
streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName); SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
taosArrayPush(pResBlock->pDataBlock, &data);
blockDataEnsureCapacity(pResBlock, 1);
projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL);
ASSERT(pResBlock->info.rows == 1);
ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
void* pData = colDataGetVarData(pCol, 0);
// TODO check tbname validity
if (pData != (void*)-1) {
memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(pDest->info.parTbName, varDataVal(pData), len);
/*pDest->info.parTbName[len + 1] = 0;*/
} else {
pDest->info.parTbName[0] = 0;
}
if (pParInfo->groupId && pDest->info.parTbName[0]) {
streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName);
}
/*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/
blockDataDestroy(pTmpBlock);
blockDataDestroy(pResBlock);
} }
/*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/
blockDataDestroy(pTmpBlock);
blockDataDestroy(pResBlock);
} }
} }
taosArrayDestroy(pParInfo->rowIds); taosArrayDestroy(pParInfo->rowIds);
......
...@@ -163,8 +163,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro ...@@ -163,8 +163,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, buf, SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
if (p1 == NULL) { if (p1 == NULL) {
return NULL; return NULL;
...@@ -306,7 +306,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo ...@@ -306,7 +306,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock, static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) { uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
pCost->totalBlocks += 1; pCost->totalBlocks += 1;
...@@ -1312,6 +1312,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS ...@@ -1312,6 +1312,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
varDataSetLen(tbname, strlen(varDataVal(tbname))); varDataSetLen(tbname, strlen(varDataVal(tbname)));
tdbFree(parTbname);
} }
appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
tbname[0] == 0 ? NULL : tbname); tbname[0] == 0 ? NULL : tbname);
...@@ -1510,10 +1511,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1510,10 +1511,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
if (pInfo->numOfPseudoExpr > 0) { if (pInfo->numOfPseudoExpr > 0) {
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
if (code != TSDB_CODE_SUCCESS) { // ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
blockDataFreeRes((SSDataBlock*)pBlock); blockDataFreeRes((SSDataBlock*)pBlock);
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
// reset the error code.
terrno = 0;
} }
if (filter) { if (filter) {
...@@ -1928,6 +1933,7 @@ FETCH_NEXT_BLOCK: ...@@ -1928,6 +1933,7 @@ FETCH_NEXT_BLOCK:
if (pInfo->validBlockIndex >= totBlockNum) { if (pInfo->validBlockIndex >= totBlockNum) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo); doClearBufferedBlocks(pInfo);
qDebug("stream scan return empty, consume block %d", totBlockNum);
return NULL; return NULL;
} }
...@@ -2562,7 +2568,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2562,7 +2568,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
uint32_t status = 0; uint32_t status = 0;
loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
...@@ -2893,7 +2899,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2893,7 +2899,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
goto _error; goto _error;
} }
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createResDataBlock(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
......
...@@ -762,12 +762,10 @@ void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupI ...@@ -762,12 +762,10 @@ void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupI
resetPrevAndNextWindow(pFillSup, pState); resetPrevAndNextWindow(pFillSup, pState);
SWinKey key = {.ts = ts, .groupId = groupId}; SWinKey key = {.ts = ts, .groupId = groupId};
// void* curVal = NULL;
int32_t curVLen = 0; int32_t curVLen = 0;
int32_t code = streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen); int32_t code = streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen);
ASSERT(code == TSDB_CODE_SUCCESS); ASSERT(code == TSDB_CODE_SUCCESS);
pFillSup->cur.key = key.ts; pFillSup->cur.key = key.ts;
// pFillSup->cur.pRowVal = curVal;
} }
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) { void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
...@@ -952,6 +950,19 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS ...@@ -952,6 +950,19 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS
} }
} }
void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
for (int32_t i = pFillSup->numOfFillCols; i < pFillSup->numOfAllCols; ++i) {
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
int32_t slotId = GET_DEST_SLOT_ID(pFillCol);
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
SResultCellData* pCurCell = getResultCell(&pFillSup->cur, slotId);
pCell->isNull = pCurCell->isNull;
if (!pCurCell->isNull) {
memcpy(pCell->pData, pCurCell->pData, pCell->bytes);
}
}
}
void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillSupporter* pFillSup, void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillSupporter* pFillSup,
SStreamFillInfo* pFillInfo) { SStreamFillInfo* pFillInfo) {
pFillInfo->preRowKey = pFillSup->cur.key; pFillInfo->preRowKey = pFillSup->cur.key;
...@@ -993,6 +1004,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS ...@@ -993,6 +1004,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START; pFillInfo->pos = FILL_POS_START;
} }
copyNotFillExpData(pFillSup, pFillInfo);
} break; } break;
case TSDB_FILL_PREV: { case TSDB_FILL_PREV: {
if (hasNextWindow(pFillSup) && ((pFillSup->next.key != pFillInfo->nextRowKey) || if (hasNextWindow(pFillSup) && ((pFillSup->next.key != pFillInfo->nextRowKey) ||
......
...@@ -3376,7 +3376,8 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { ...@@ -3376,7 +3376,8 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
int64_t* pts = (int64_t*)pInput->pPTS->pData; int64_t* pts = (int64_t*)pInput->pPTS->pData;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
char* data = colDataGetData(pInputCol, i); bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
char* data = isNull ? NULL : colDataGetData(pInputCol, i);
TSKEY cts = pts[i]; TSKEY cts = pts[i];
numOfElems++; numOfElems++;
......
...@@ -699,8 +699,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou ...@@ -699,8 +699,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
} else { } else {
for (int32_t m = 0; m < node->pParameterList->length; m++) { for (int32_t m = 0; m < node->pParameterList->length; m++) {
output->status = sifMergeCond(node->condType, output->status, params[m].status); output->status = sifMergeCond(node->condType, output->status, params[m].status);
taosArrayDestroy(params[m].result); // taosArrayDestroy(params[m].result);
params[m].result = NULL; // params[m].result = NULL;
} }
} }
_return: _return:
...@@ -857,9 +857,15 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { ...@@ -857,9 +857,15 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
SIF_ERR_RET(TSDB_CODE_QRY_APP_ERROR); SIF_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
*status = res->status; *status = res->status;
sifFreeParam(res); sifFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
void *iter = taosHashIterate(ctx.pRes, NULL);
while (iter != NULL) {
SIFParam *data = (SIFParam *)iter;
sifFreeParam(data);
iter = taosHashIterate(ctx.pRes, iter);
}
taosHashCleanup(ctx.pRes); taosHashCleanup(ctx.pRes);
return code; return code;
} }
......
...@@ -603,7 +603,7 @@ int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell ...@@ -603,7 +603,7 @@ int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell
bool *equal = (bool *)colDataGetData(pComp->columnData, rowIdx); bool *equal = (bool *)colDataGetData(pComp->columnData, rowIdx);
if (*equal) { if (*equal) {
bool isNull = colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)); bool isNull = colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0));
char *pData = isNull ? NULL : colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)); char *pData = isNull ? NULL : colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0));
colDataAppend(output->columnData, rowIdx, pData, isNull); colDataAppend(output->columnData, rowIdx, pData, isNull);
...@@ -617,7 +617,7 @@ int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell ...@@ -617,7 +617,7 @@ int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell
} }
if (pElse) { if (pElse) {
bool isNull = colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)); bool isNull = colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0));
char *pData = isNull ? NULL : colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)); char *pData = isNull ? NULL : colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0));
colDataAppend(output->columnData, rowIdx, pData, isNull); colDataAppend(output->columnData, rowIdx, pData, isNull);
...@@ -666,7 +666,7 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell *pCe ...@@ -666,7 +666,7 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell *pCe
bool *whenValue = (bool *)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? rowIdx : 0)); bool *whenValue = (bool *)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? rowIdx : 0));
if (*whenValue) { if (*whenValue) {
bool isNull = colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)); bool isNull = colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0));
char *pData = isNull ? NULL : colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)); char *pData = isNull ? NULL : colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0));
colDataAppend(output->columnData, rowIdx, pData, isNull); colDataAppend(output->columnData, rowIdx, pData, isNull);
...@@ -685,7 +685,7 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell *pCe ...@@ -685,7 +685,7 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell *pCe
} }
if (pElse) { if (pElse) {
bool isNull = colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)); bool isNull = colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0));
char *pData = isNull ? NULL : colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)); char *pData = isNull ? NULL : colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0));
colDataAppend(output->columnData, rowIdx, pData, isNull); colDataAppend(output->columnData, rowIdx, pData, isNull);
...@@ -1210,6 +1210,7 @@ EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) { ...@@ -1210,6 +1210,7 @@ EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) {
SScalarParam output = {0}; SScalarParam output = {0};
ctx->code = sclExecOperator(node, ctx, &output); ctx->code = sclExecOperator(node, ctx, &output);
if (ctx->code) { if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
...@@ -1358,6 +1359,7 @@ EDealRes sclWalkOperator(SNode *pNode, SScalarCtx *ctx) { ...@@ -1358,6 +1359,7 @@ EDealRes sclWalkOperator(SNode *pNode, SScalarCtx *ctx) {
ctx->code = sclExecOperator(node, ctx, &output); ctx->code = sclExecOperator(node, ctx, &output);
if (ctx->code) { if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
......
此差异已折叠。
...@@ -47,7 +47,6 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov ...@@ -47,7 +47,6 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
SEpSet* pEpSet); SEpSet* pEpSet);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
void streamFreeQitem(SStreamQueueItem* data);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -45,3 +45,59 @@ void streamQueueClose(SStreamQueue* queue) { ...@@ -45,3 +45,59 @@ void streamQueueClose(SStreamQueue* queue) {
taosCloseQueue(queue->queue); taosCloseQueue(queue->queue);
taosMemoryFree(queue); taosMemoryFree(queue);
} }
bool streamQueueResEmpty(const SStreamQueueRes* pRes) {
//
return true;
}
int64_t streamQueueResSize(const SStreamQueueRes* pRes) { return pRes->size; }
SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes) { return pRes->head; }
SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes) {
SStreamQueueNode* pRet = pRes->head;
pRes->head = pRes->head->next;
return pRet;
}
void streamQueueResClear(SStreamQueueRes* pRes) {
while (pRes->head) {
SStreamQueueNode* pNode = pRes->head;
streamFreeQitem(pRes->head->item);
pRes->head = pNode;
}
}
SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pTail) {
int64_t size = 0;
SStreamQueueNode* head = NULL;
while (pTail) {
SStreamQueueNode* pTmp = pTail->next;
pTail->next = head;
head = pTail;
pTail = pTmp;
size++;
}
return (SStreamQueueRes){.head = head, .size = size};
}
bool streamQueueHasTask(const SStreamQueue1* pQueue) { return atomic_load_ptr(pQueue->pHead); }
int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem) {
SStreamQueueNode* pNode = taosMemoryMalloc(sizeof(SStreamQueueNode));
pNode->item = pItem;
SStreamQueueNode* pHead = atomic_load_ptr(pQueue->pHead);
while (1) {
pNode->next = pHead;
SStreamQueueNode* pOld = atomic_val_compare_exchange_ptr(pQueue->pHead, pHead, pNode);
if (pOld == pHead) {
break;
}
}
return 0;
}
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
SStreamQueueNode* pNode = atomic_exchange_ptr(pQueue->pHead, NULL);
if (pNode) return streamQueueBuildRes(pNode);
return (SStreamQueueRes){0};
}
...@@ -70,6 +70,7 @@ typedef struct SSyncTimer { ...@@ -70,6 +70,7 @@ typedef struct SSyncTimer {
uint64_t logicClock; uint64_t logicClock;
uint64_t counter; uint64_t counter;
int32_t timerMS; int32_t timerMS;
int64_t timeStamp;
SRaftId destId; SRaftId destId;
int64_t hbDataRid; int64_t hbDataRid;
} SSyncTimer; } SSyncTimer;
......
...@@ -35,6 +35,7 @@ typedef struct SyncTimeout { ...@@ -35,6 +35,7 @@ typedef struct SyncTimeout {
ESyncTimeoutType timeoutType; ESyncTimeoutType timeoutType;
uint64_t logicClock; uint64_t logicClock;
int32_t timerMS; int32_t timerMS;
int64_t timeStamp;
void* data; // need optimized void* data; // need optimized
} SyncTimeout; } SyncTimeout;
......
...@@ -94,11 +94,11 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c ...@@ -94,11 +94,11 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed);
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff);
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff);
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
...@@ -115,7 +115,7 @@ void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs ...@@ -115,7 +115,7 @@ void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s);
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s); void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s);
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
......
...@@ -691,6 +691,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa ...@@ -691,6 +691,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa
pSyncTimer->timerMS = pSyncNode->hbBaseLine; pSyncTimer->timerMS = pSyncNode->hbBaseLine;
pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer; pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
pSyncTimer->destId = destId; pSyncTimer->destId = destId;
pSyncTimer->timeStamp = taosGetTimestampMs();
atomic_store_64(&pSyncTimer->logicClock, 0); atomic_store_64(&pSyncTimer->logicClock, 0);
return 0; return 0;
} }
...@@ -704,6 +705,7 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { ...@@ -704,6 +705,7 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
pData->rid = syncHbTimerDataAdd(pData); pData->rid = syncHbTimerDataAdd(pData);
} }
pSyncTimer->hbDataRid = pData->rid; pSyncTimer->hbDataRid = pData->rid;
pSyncTimer->timeStamp = taosGetTimestampMs();
pData->syncNodeRid = pSyncNode->rid; pData->syncNodeRid = pSyncNode->rid;
pData->pTimer = pSyncTimer; pData->pTimer = pSyncTimer;
...@@ -1897,7 +1899,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { ...@@ -1897,7 +1899,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
return; return;
} }
sTrace("enqueue ping msg"); // sTrace("enqueue ping msg");
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("failed to sync enqueue ping msg since %s", terrstr()); sError("failed to sync enqueue ping msg since %s", terrstr());
...@@ -2032,6 +2034,11 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { ...@@ -2032,6 +2034,11 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId); (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
// update reset time
int64_t tsNow = taosGetTimestampMs();
int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
pSyncTimer->timeStamp = tsNow;
SyncHeartbeat* pSyncMsg = rpcMsg.pCont; SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pData->destId; pSyncMsg->destId = pData->destId;
...@@ -2039,9 +2046,10 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { ...@@ -2039,9 +2046,10 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0; pSyncMsg->privateTerm = 0;
pSyncMsg->timeStamp = taosGetTimestampMs(); pSyncMsg->timeStamp = tsNow;
// send msg // send msg
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed);
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
} else { } else {
...@@ -2151,9 +2159,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2151,9 +2159,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncHeartbeat* pMsg = pRpcMsg->pCont; SyncHeartbeat* pMsg = pRpcMsg->pCont;
int64_t tsMs = taosGetTimestampMs(); int64_t tsMs = taosGetTimestampMs();
char buf[128]; int64_t timeDiff = tsMs - pMsg->timeStamp;
snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs); syncLogRecvHeartbeat(ths, pMsg, timeDiff);
syncLogRecvHeartbeat(ths, pMsg, buf);
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId); (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
...@@ -2163,7 +2170,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2163,7 +2170,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsgReply->srcId = ths->myRaftId; pMsgReply->srcId = ths->myRaftId;
pMsgReply->term = ths->pRaftStore->currentTerm; pMsgReply->term = ths->pRaftStore->currentTerm;
pMsgReply->privateTerm = 8864; // magic number pMsgReply->privateTerm = 8864; // magic number
pMsgReply->timeStamp = taosGetTimestampMs(); pMsgReply->timeStamp = tsMs;
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
...@@ -2229,9 +2236,8 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2229,9 +2236,8 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncHeartbeatReply* pMsg = pRpcMsg->pCont; SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
int64_t tsMs = taosGetTimestampMs(); int64_t tsMs = taosGetTimestampMs();
char buf[128]; int64_t timeDiff = tsMs - pMsg->timeStamp;
snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs); syncLogRecvHeartbeatReply(ths, pMsg, timeDiff);
syncLogRecvHeartbeatReply(ths, pMsg, buf);
// update last reply time, make decision whether the other node is alive or not // update last reply time, make decision whether the other node is alive or not
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs); syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
......
...@@ -35,6 +35,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l ...@@ -35,6 +35,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l
pTimeout->timeoutType = timeoutType; pTimeout->timeoutType = timeoutType;
pTimeout->logicClock = logicClock; pTimeout->logicClock = logicClock;
pTimeout->timerMS = timerMS; pTimeout->timerMS = timerMS;
pTimeout->timeStamp = taosGetTimestampMs();
pTimeout->data = pNode; pTimeout->data = pNode;
return 0; return 0;
} }
......
...@@ -208,7 +208,6 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest ...@@ -208,7 +208,6 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest
} }
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) { int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
syncLogSendHeartbeat(pSyncNode, pMsg->pCont, "");
return syncNodeSendMsgById(destId, pSyncNode, pMsg); return syncNodeSendMsgById(destId, pSyncNode, pMsg);
} }
...@@ -231,6 +230,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { ...@@ -231,6 +230,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
pSyncMsg->timeStamp = ts; pSyncMsg->timeStamp = ts;
// send msg // send msg
syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0);
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
} }
......
...@@ -94,7 +94,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -94,7 +94,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
syncLogRecvRequestVote(ths, pMsg, "not in my config"); syncLogRecvRequestVote(ths, pMsg, -1, "not in my config");
return -1; return -1;
} }
...@@ -133,13 +133,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -133,13 +133,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pReply->voteGranted = grant; pReply->voteGranted = grant;
// trace log // trace log
do { syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "");
char logBuf[32]; syncLogSendRequestVoteReply(ths, pReply, "");
snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
syncLogRecvRequestVote(ths, pMsg, logBuf);
syncLogSendRequestVoteReply(ths, pReply, "");
} while (0);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
return 0; return 0;
} }
\ No newline at end of file
...@@ -396,16 +396,25 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df ...@@ -396,16 +396,25 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
} }
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s", if (!(sDebugFlag & DEBUG_TRACE)) return;
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
int64_t tsNow = taosGetTimestampMs();
int64_t timeDIff = tsNow - pMsg->timeStamp;
sNTrace(
pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, ts:%" PRId64 ", elapsed:%" PRId64 ", data:%p}, %s",
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->timeStamp, timeDIff, pMsg->data, s);
} }
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) { void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd, sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s); syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
} }
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -417,6 +426,8 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries ...@@ -417,6 +426,8 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
} }
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
...@@ -427,28 +438,42 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries ...@@ -427,28 +438,42 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
} }
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, if (printX) {
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s", sNTrace(pSyncNode,
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s); "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
"}, x",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
} else {
sNTrace(pSyncNode,
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
"}, timer-elapsed:%" PRId64,
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed);
}
} }
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, sNTrace(pSyncNode,
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
"}, %s", "}, net elapsed:%" PRId64,
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s); host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timeDiff);
} }
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -457,15 +482,19 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p ...@@ -457,15 +482,19 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
pMsg->term, pMsg->timeStamp, s); pMsg->term, pMsg->timeStamp, s);
} }
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port, sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, net elapsed:%" PRId64,
pMsg->term, pMsg->timeStamp, s); host, port, pMsg->term, pMsg->timeStamp, timeDiff);
} }
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -473,6 +502,8 @@ void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs ...@@ -473,6 +502,8 @@ void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs
} }
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
...@@ -480,6 +511,8 @@ void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs ...@@ -480,6 +511,8 @@ void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs
} }
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -488,6 +521,8 @@ void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot ...@@ -488,6 +521,8 @@ void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot
} }
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
...@@ -496,6 +531,8 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot ...@@ -496,6 +531,8 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot
} }
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -507,6 +544,8 @@ void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p ...@@ -507,6 +544,8 @@ void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p
} }
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
...@@ -519,6 +558,8 @@ void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p ...@@ -519,6 +558,8 @@ void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p
} }
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -530,6 +571,8 @@ void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs ...@@ -530,6 +571,8 @@ void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
} }
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
...@@ -541,6 +584,8 @@ void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs ...@@ -541,6 +584,8 @@ void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
} }
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
...@@ -553,6 +598,8 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs ...@@ -553,6 +598,8 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
} }
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -563,16 +610,28 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs ...@@ -563,16 +610,28 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
pMsg->dataLen, s); pMsg->dataLen, s);
} }
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char logBuf[256]; char logBuf[256];
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); if (voteGranted == -1) {
sNTrace(pSyncNode,
"recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
} else {
sNTrace(pSyncNode,
"recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, granted:%d",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, voteGranted);
}
} }
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) { void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -581,6 +640,8 @@ void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const ...@@ -581,6 +640,8 @@ void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const
} }
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
...@@ -589,6 +650,8 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl ...@@ -589,6 +650,8 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
} }
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
......
...@@ -195,7 +195,7 @@ static bool uvHandleReq(SSvrConn* pConn) { ...@@ -195,7 +195,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
} }
if (transDecompressMsg((char**)&pHead, msgLen) < 0) { if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
return false; return false;
} }
...@@ -277,10 +277,8 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -277,10 +277,8 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread);
if (pBuf->len <= TRANS_PACKET_LIMIT) { if (pBuf->len <= TRANS_PACKET_LIMIT) {
while (transReadComplete(pBuf)) { while (transReadComplete(pBuf)) {
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
if (true == pBuf->invalid || false == uvHandleReq(conn)) { if (true == pBuf->invalid || false == uvHandleReq(conn)) {
tError("%s conn %p read invalid packet, received from %s, local info:%s", transLabel(pTransInst), conn, tError("%s conn %p read invalid packet, received from %s, local info:%s", transLabel(pTransInst), conn,
conn->dst, conn->src); conn->dst, conn->src);
......
...@@ -418,18 +418,18 @@ ...@@ -418,18 +418,18 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
,,,system-test,python3 ./test.py -f 1-insert/alter_database.py ,,,system-test,python3 ./test.py -f 1-insert/alter_database.py
,,,system-test,python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
,,,system-test,python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py ,,,system-test,python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
,,,system-test,python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
,,,system-test,python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py
#,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_table.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_table.py
,,n,system-test,python3 ./test.py -f 1-insert/boundary.py ,,n,system-test,python3 ./test.py -f 1-insert/boundary.py
,,n,system-test,python3 ./test.py -f 1-insert/insertWithMoreVgroup.py ,,n,system-test,python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_comment.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_comment.py
,,n,system-test,python3 ./test.py -f 1-insert/time_range_wise.py ,,n,system-test,python3 ./test.py -f 1-insert/time_range_wise.py
,,,system-test,python3 ./test.py -f 1-insert/block_wise.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/block_wise.py
,,,system-test,python3 ./test.py -f 1-insert/create_retentions.py ,,,system-test,python3 ./test.py -f 1-insert/create_retentions.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/mutil_stage.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/mutil_stage.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_param_ttl.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_param_ttl.py
...@@ -622,7 +622,7 @@ ...@@ -622,7 +622,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/join2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/join2.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/union1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/union1.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat2.py
,,,system-test,python3 ./test.py -f 2-query/json_tag.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py
,,,system-test,python3 ./test.py -f 2-query/nestedQuery.py ,,,system-test,python3 ./test.py -f 2-query/nestedQuery.py
,,,system-test,python3 ./test.py -f 2-query/nestedQuery_str.py ,,,system-test,python3 ./test.py -f 2-query/nestedQuery_str.py
,,,system-test,python3 ./test.py -f 2-query/nestedQuery_math.py ,,,system-test,python3 ./test.py -f 2-query/nestedQuery_math.py
...@@ -670,23 +670,23 @@ ...@@ -670,23 +670,23 @@
,,,system-test,python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1 ,,,system-test,python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/create_wrong_topic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/create_wrong_topic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3
,,,system-test,python3 ./test.py -f 7-tmq/basic5.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/basic5.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb0.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb1.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb2.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb3.py ,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb3.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb4.py ,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb4.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb0.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb1.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb2.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb3.py ,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb3.py
,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb4.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb4.py
,,,system-test,python3 ./test.py -f 7-tmq/db.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/db.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqError.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmqError.py
,,,system-test,python3 ./test.py -f 7-tmq/schema.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/schema.py
,,,system-test,python3 ./test.py -f 7-tmq/stbFilter.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbFilter.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqCheckData.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqCheckData.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqCheckData1.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqCheckData1.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqConsumerGroup.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
...@@ -709,8 +709,8 @@ ...@@ -709,8 +709,8 @@
,,,system-test,python3 ./test.py -f 7-tmq/tmqDnodeRestart1.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqDnodeRestart1.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot1.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqDropStb.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqDropStb.py
...@@ -718,8 +718,8 @@ ...@@ -718,8 +718,8 @@
,,,system-test,python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot0.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot0.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot1.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot1.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqUdf.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqUdf.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
,,,system-test,python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py ,,,system-test,python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py
,,,system-test,python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,,system-test,python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
,,,system-test,python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,,system-test,python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
...@@ -768,7 +768,7 @@ ...@@ -768,7 +768,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/timetruncate.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/timetruncate.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/diff.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/diff.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Timediff.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Timediff.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/json_tag.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/bottom.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/bottom.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/percentile.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/percentile.py -Q 2
...@@ -862,7 +862,7 @@ ...@@ -862,7 +862,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/timetruncate.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/timetruncate.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/diff.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/diff.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Timediff.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Timediff.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/json_tag.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/bottom.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/bottom.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/percentile.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/percentile.py -Q 3
...@@ -978,8 +978,8 @@ ...@@ -978,8 +978,8 @@
,,,system-test,python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 ,,,system-test,python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 ,,,system-test,python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 ,,,system-test,python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/stablity.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/stablity_1.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/avg.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/avg.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 4
......
...@@ -195,7 +195,7 @@ sql select * from information_schema.ins_stables ...@@ -195,7 +195,7 @@ sql select * from information_schema.ins_stables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
#sql select * from performance_schema.perf_streams #sql select * frominformation_schema.ins_streams
sql select * from information_schema.ins_tables sql select * from information_schema.ins_tables
if $rows <= 0 then if $rows <= 0 then
return -1 return -1
......
...@@ -403,23 +403,46 @@ sql drop database if exists test4; ...@@ -403,23 +403,46 @@ sql drop database if exists test4;
sql create database test4 vgroups 1; sql create database test4 vgroups 1;
sql use test4; sql use test4;
sql create table t1(ts timestamp, a int, b int , c int, d double, s varchar(20));; sql create stable st(ts timestamp,a int,b int,c int, d double, s varchar(20) ) tags(ta int,tb int,tc int);
sql create stream streams4 trigger at_once into streamt4 as select _wstart ts, count(*) c1 from t1 where ts > 1648791210000 and ts < 1648791413000 interval(10s) fill(NULL); sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams4 trigger at_once into streamt4 as select _wstart ts, count(*) c1, concat(tbname, 'aaa') as pname, timezone() from st where ts > 1648791000000 and ts < 1648793000000 partition by tbname interval(10s) fill(NULL);
sql insert into t1 values(1648791213000,1,2,3,1.0,'aaa'); sql insert into t1 values(1648791213000,1,2,3,1.0,'aaa');
sql insert into t1 values(1648791233000,1,2,3,1.0,'aaa'); sql insert into t1 values(1648791233000,1,2,3,1.0,'aaa');
sql insert into t1 values(1648791273000,1,2,3,1.0,'aaa');
sql insert into t2 values(1648791213000,1,2,3,1.0,'bbb');
sql insert into t2 values(1648791233000,1,2,3,1.0,'bbb');
sql insert into t2 values(1648791273000,1,2,3,1.0,'bbb');
$loop_count = 0 $loop_count = 0
loop4: loop4:
sleep 200 sleep 200
sql select * from streamt4 order by ts; sql select * from streamt4 order by pname, ts;
print ===> $data[0][0] , $data[0][1] , $data[0][2] , $data[0][3]
print ===> $data[1][0] , $data[1][1] , $data[1][2] , $data[1][3]
print ===> $data[2][0] , $data[2][1] , $data[2][2] , $data[2][3]
print ===> $data[3][0] , $data[3][1] , $data[3][2] , $data[3][3]
print ===> $data[4][0] , $data[4][1] , $data[4][2] , $data[4][3]
print ===> $data[5][0] , $data[5][1] , $data[5][2] , $data[5][3]
print ===> $data[6][0] , $data[6][1] , $data[6][2] , $data[6][3]
print ===> $data[7][0] , $data[7][1] , $data[7][2] , $data[7][3]
print ===> $data[8][0] , $data[8][1] , $data[8][2] , $data[8][3]
print ===> $data[9][0] , $data[9][1] , $data[9][2] , $data[9][3]
print ===> $data[10][0] , $data[10][1] , $data[10][2] , $data[10][3]
print ===> $data[11][0] , $data[11][1] , $data[11][2] , $data[11][3]
print ===> $data[12][0] , $data[12][1] , $data[12][2] , $data[12][3]
print ===> $data[13][0] , $data[13][1] , $data[13][2] , $data[13][3]
$loop_count = $loop_count + 1 $loop_count = $loop_count + 1
if $loop_count == 10 then if $loop_count == 10 then
return -1 return -1
endi endi
if $rows != 3 then if $rows != 14 then
print =====rows=$rows print =====rows=$rows
goto loop4 goto loop4
endi endi
...@@ -429,6 +452,67 @@ if $data11 != NULL then ...@@ -429,6 +452,67 @@ if $data11 != NULL then
goto loop4 goto loop4
endi endi
if $data12 != t1aaa then
print =====data12=$data12
goto loop4
endi
if $data13 == NULL then
print =====data13=$data13
goto loop4
endi
if $data32 != t1aaa then
print =====data32=$data32
goto loop4
endi
if $data42 != t1aaa then
print =====data42=$data42
goto loop4
endi
if $data52 != t1aaa then
print =====data52=$data52
goto loop4
endi
if $data81 != NULL then
print =====data81=$data81
goto loop4
endi
if $data82 != t2aaa then
print =====data82=$data82
goto loop4
endi
if $data83 == NULL then
print =====data83=$data83
goto loop4
endi
if $data[10][2] != t2aaa then
print =====data[10][2]=$data[10][2]
goto loop4
endi
if $data[11][2] != t2aaa then
print =====data[11][2]=$data[11][2]
goto loop4
endi
if $data[12][2] != t2aaa then
print =====data[12][2]=$data[12][2]
goto loop4
endi
if $data[12][3] == NULL then
print =====data[12][3]=$data[12][3]
goto loop4
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册