diff --git a/example/src/tmq.c b/example/src/tmq.c index b79d21d05101e4147ce2d24842258bd149802a86..00eb8462f4daa00fb7308b49a42f3b63912f8c7d 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -106,8 +106,8 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ + pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c853794e868e70b0294f056a55d46a170f6e4f69..c82472eec05c0f5104dd9f3a9697078e27799948 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -206,7 +206,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { int32_t totalVgNum = pOutput->pSub->vgNum; - mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum); + mInfo("mq rebalance: subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum); // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); @@ -231,6 +231,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("mq rebalance: remove vg %d from consumer %ld", pVgEp->vgId, consumerId); } taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); // put into removed @@ -250,6 +251,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); + mInfo("mq rebalance: remove vg %d from unassigned", pVgEp->vgId); } } @@ -263,6 +265,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR minVgCnt = totalVgNum / afterRebConsumerNum; imbConsumerNum = totalVgNum % afterRebConsumerNum; } + mInfo("mq rebalance: %d consumer after rebalance, at least %d vg each, %d consumer has more vg", afterRebConsumerNum, + minVgCnt, imbConsumerNum); // 4. first scan: remove consumer more than wanted, put to remove hash int32_t imbCnt = 0; @@ -290,6 +294,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("mq rebalance: remove vg %d from consumer %ld (first scan)", pVgEp->vgId, pConsumerEp->consumerId); } imbCnt++; } @@ -303,6 +308,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("mq rebalance: remove vg %d from consumer %ld (first scan)", pVgEp->vgId, pConsumerEp->consumerId); } } } @@ -319,6 +325,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); + mInfo("mq rebalance: add new consumer %ld", consumerId); } } @@ -343,6 +350,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); + mInfo("mq rebalance: add vg %d to consumer %ld (second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } @@ -360,6 +368,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); + mInfo("mq rebalance: add vg %d to consumer %ld (second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } else { // if all consumer is removed, put all vg into unassigned @@ -372,6 +381,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ASSERT(pRebOutput->newConsumerId == -1); taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); taosArrayPush(pOutput->rebVgs, pRebOutput); + mInfo("mq rebalance: unassign vg %d (second scan)", pRebOutput->pVgEp->vgId); } } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 38dedee5a29b044c89c11b66b00bbbf2160e0903..d0420e1b8448c159f3892df123b1ebf20bbaf0e1 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -179,6 +179,7 @@ struct STQ { SHashObj* pStreamTasks; SVnode* pVnode; SWal* pWal; + // TDB* pTdb; }; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 80aa350e4def046867242a4b444d075a42e57da2..c69f7d71d5db15a9085d0b694ec71a9a1afb3ee6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -32,6 +32,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { pTq->path = strdup(path); pTq->pVnode = pVnode; pTq->pWal = pWal; + /*if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {*/ + /*ASSERT(0);*/ + /*}*/ #if 0 pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 66073b70ebf0ace2ade2ef9bd11bae7b5908c555..35b81b135f20769b63220c743997246c4cceb160 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -106,7 +106,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { pMsg->contentLen = pMsg->contentLen; #endif - qDebugL("stream task string %s", (const char*)msg); + /*qDebugL("stream task string %s", (const char*)msg);*/ struct SSubplan* plan = NULL; int32_t code = qStringToSubplan(msg, &plan); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e42ced4669907b8114d32f7ea4f42e578124ec5a..db34f5dc6714ddf7b0be369c1da1ee5d5cb9b7df 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5311,4 +5311,4 @@ int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t ke pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);; return TSDB_CODE_SUCCESS; } - \ No newline at end of file + diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f48095ecb863dabc615fcea0b369eb473339b2a0..db63c71d1123cec32e810a5583deb0a936688070 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -9,11 +9,11 @@ #include "tmsg.h" #include "tname.h" -SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true}; +SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true}; SQWorkerMgmt gQwMgmt = { - .lock = 0, - .qwRef = -1, - .qwNum = 0, + .lock = 0, + .qwRef = -1, + .qwNum = 0, }; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { @@ -110,9 +110,9 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { QW_LOCK(QW_READ, &mgmt->schLock); - QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash)); + /*QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));*/ - void * key = NULL; + void *key = NULL; size_t keyLen = 0; int32_t i = 0; SQWSchStatus *sch = NULL; @@ -127,7 +127,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { QW_UNLOCK(QW_READ, &mgmt->schLock); - QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash)); + /*QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));*/ } char *qwPhaseStr(int32_t phase) { @@ -462,7 +462,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { } int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -499,7 +499,7 @@ _return: } int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) { - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -550,11 +550,11 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t code = 0; bool qcontinue = true; - SSDataBlock * pRes = NULL; + SSDataBlock *pRes = NULL; uint64_t useconds = 0; int32_t i = 0; int32_t execNum = 0; - qTaskInfo_t * taskHandle = &ctx->taskHandle; + qTaskInfo_t *taskHandle = &ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; while (true) { @@ -632,7 +632,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) return TSDB_CODE_QRY_OUT_OF_MEMORY; } - void * key = NULL; + void *key = NULL; size_t keyLen = 0; int32_t i = 0; STaskStatus status = {0}; @@ -719,8 +719,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void } int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { - int32_t code = 0; - SQWTaskCtx * ctx = NULL; + int32_t code = 0; + SQWTaskCtx *ctx = NULL; SRpcHandleInfo *dropConnection = NULL; SRpcHandleInfo *cancelConnection = NULL; @@ -925,13 +925,13 @@ _return: } int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) { - int32_t code = 0; - bool queryRsped = false; - SSubplan* plan = NULL; - SQWPhaseInput input = {0}; - qTaskInfo_t pTaskInfo = NULL; - DataSinkHandle sinkHandle = NULL; - SQWTaskCtx * ctx = NULL; + int32_t code = 0; + bool queryRsped = false; + SSubplan *plan = NULL; + SQWPhaseInput input = {0}; + qTaskInfo_t pTaskInfo = NULL; + DataSinkHandle sinkHandle = NULL; + SQWTaskCtx *ctx = NULL; QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); @@ -944,7 +944,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex ctx->ctrlConnInfo = qwMsg->connInfo; - QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); + /*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/ code = qStringToSubplan(qwMsg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { @@ -1055,10 +1055,10 @@ _return: } int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { - SQWTaskCtx * ctx = NULL; + SQWTaskCtx *ctx = NULL; int32_t code = 0; SQWPhaseInput input = {0}; - void * rsp = NULL; + void *rsp = NULL; int32_t dataLen = 0; bool queryEnd = false; @@ -1138,8 +1138,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; int32_t dataLen = 0; bool locked = false; - SQWTaskCtx * ctx = NULL; - void * rsp = NULL; + SQWTaskCtx *ctx = NULL; + void *rsp = NULL; SQWPhaseInput input = {0}; QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL)); @@ -1274,7 +1274,7 @@ _return: int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); @@ -1300,7 +1300,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; if (qwMsg->code) { QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req)); @@ -1338,28 +1338,28 @@ _return: qwMsg->connInfo.handle = NULL; } - QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); + /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/ QW_RET(TSDB_CODE_SUCCESS); } void qwProcessHbTimerEvent(void *param, void *tmrId) { - SQWHbParam* hbParam = (SQWHbParam*)param; + SQWHbParam *hbParam = (SQWHbParam *)param; if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) { return; } - int64_t refId = hbParam->refId; + int64_t refId = hbParam->refId; SQWorker *mgmt = qwAcquire(refId); if (NULL == mgmt) { QW_DLOG("qwAcquire %" PRIx64 "failed", refId); taosMemoryFree(param); return; } - + SQWSchStatus *sch = NULL; int32_t taskNum = 0; - SQWHbInfo * rspList = NULL; + SQWHbInfo *rspList = NULL; int32_t code = 0; qwDbgDumpMgmtInfo(mgmt); @@ -1383,7 +1383,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { return; } - void * key = NULL; + void *key = NULL; size_t keyLen = 0; int32_t i = 0; @@ -1413,29 +1413,27 @@ _return: for (int32_t j = 0; j < i; ++j) { qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); - QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), - (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); + /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/ + /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/ tFreeSSchedulerHbRsp(&rspList[j].rsp); } taosMemoryFreeClear(rspList); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); - qwRelease(refId); + qwRelease(refId); } void qwCloseRef(void) { taosWLockLatch(&gQwMgmt.lock); if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { taosCloseRef(gQwMgmt.qwRef); - gQwMgmt.qwRef= -1; + gQwMgmt.qwRef = -1; } taosWUnLockLatch(&gQwMgmt.lock); } -void qwDestroySchStatus(SQWSchStatus *pStatus) { - taosHashCleanup(pStatus->tasksHash); -} +void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); } void qwDestroyImpl(void *pMgmt) { SQWorker *mgmt = (SQWorker *)pMgmt; @@ -1454,12 +1452,12 @@ void qwDestroyImpl(void *pMgmt) { SQWSchStatus *sch = (SQWSchStatus *)pIter; qwDestroySchStatus(sch); pIter = taosHashIterate(mgmt->schHash, pIter); - } + } taosHashCleanup(mgmt->schHash); taosMemoryFree(mgmt); - atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); qwCloseRef(); } @@ -1467,7 +1465,7 @@ void qwDestroyImpl(void *pMgmt) { int32_t qwOpenRef(void) { taosWLockLatch(&gQwMgmt.lock); if (gQwMgmt.qwRef < 0) { - gQwMgmt.qwRef= taosOpenRef(100, qwDestroyImpl); + gQwMgmt.qwRef = taosOpenRef(100, qwDestroyImpl); if (gQwMgmt.qwRef < 0) { taosWUnLockLatch(&gQwMgmt.lock); qError("init qworker ref failed"); @@ -1475,14 +1473,14 @@ int32_t qwOpenRef(void) { } } taosWUnLockLatch(&gQwMgmt.lock); - + return TSDB_CODE_SUCCESS; } void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { int32_t paramIdx = 0; int32_t newParamIdx = 0; - + while (true) { paramIdx = atomic_load_32(&gQwMgmt.paramIdx); if (paramIdx == tListLen(gQwMgmt.param)) { @@ -1490,7 +1488,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { } else { newParamIdx = paramIdx + 1; } - + if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) { break; } @@ -1577,12 +1575,12 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW SQWHbParam *param = NULL; qwSetHbParam(mgmt->refId, ¶m); - mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer); + mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void *)param, mgmt->timer); if (NULL == mgmt->hbTimer) { qError("start hb timer failed"); QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + *qWorkerMgmt = mgmt; qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt); @@ -1599,9 +1597,9 @@ _return: taosTmrCleanUp(mgmt->timer); taosMemoryFreeClear(mgmt); - atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); } - + QW_RET(code); } @@ -1678,7 +1676,7 @@ int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64 } int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) { - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -1705,7 +1703,7 @@ int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId } int32_t qwCancelTask(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim index cb084ad5374ce51b539f333d0e82c04a207e4d0c..9965772c75eb62db7355a421e334d2cee171f1ea 100644 --- a/tests/script/tsim/tstream/basic1.sim +++ b/tests/script/tsim/tstream/basic1.sim @@ -24,7 +24,7 @@ sql insert into t1 values(1648791233002,3,2,3,2.1); sql insert into t1 values(1648791243003,4,2,3,3.1); sql insert into t1 values(1648791213004,4,2,3,4.1); sleep 1000 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; if $rows != 4 then print ======$rows @@ -137,7 +137,7 @@ endi sql insert into t1 values(1648791223001,12,14,13,11.1); sleep 500 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; if $rows != 4 then print ======$rows @@ -250,7 +250,7 @@ endi sql insert into t1 values(1648791223002,12,14,13,11.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 1 if $data11 != 2 then @@ -280,7 +280,7 @@ endi sql insert into t1 values(1648791223003,12,14,13,11.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 1 if $data11 != 3 then @@ -312,7 +312,7 @@ sql insert into t1 values(1648791223001,1,1,1,1.1); sql insert into t1 values(1648791223002,2,2,2,2.1); sql insert into t1 values(1648791223003,3,3,3,3.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 1 if $data11 != 3 then @@ -344,7 +344,7 @@ sql insert into t1 values(1648791233003,3,2,3,2.1); sql insert into t1 values(1648791233002,5,6,7,8.1); sql insert into t1 values(1648791233002,3,2,3,2.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 2 if $data21 != 2 then @@ -374,7 +374,7 @@ endi sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 0 if $data01 != 4 then @@ -404,7 +404,7 @@ endi sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 1 if $data11 != 4 then