diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 23455b6f50dfec466bd06e68481d7aaff31535ae..edb588b5544531d538529e3e027cf3dedeb44a0d 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -175,7 +175,7 @@ void *cqCreate(void *handle, uint64_t uid, int tid, char *sqlStr, STSchema *pSch strcpy(pObj->sqlStr, sqlStr); pObj->pSchema = tdDupSchema(pSchema); - pObj->rowSize = pSchema->tlen; + pObj->rowSize = schemaTLen(pSchema); cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); @@ -272,13 +272,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { pBlk->sversion = htonl(pSchema->version); pBlk->padding = 0; + pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowLen(trow); + pMsg->header.vgId = htonl(pContext->vgId); - pMsg->header.contLen = htonl(size - sizeof(SWalHead)); + pMsg->header.contLen = htonl(pHead->len); pMsg->length = pMsg->header.contLen; pMsg->numOfBlocks = htonl(1); pHead->msgType = TSDB_MSG_TYPE_SUBMIT; - pHead->len = size - sizeof(SWalHead); pHead->version = 0; // write into vnode write queue diff --git a/tests/script/general/stream/table_replica1_vnoden.sim b/tests/script/general/stream/table_replica1_vnoden.sim index 44d4008dbd36594575472477b4a36887bc5f25ba..e1d5a9babf5147ed9d4815ba00608d116d5b3773 100644 --- a/tests/script/general/stream/table_replica1_vnoden.sim +++ b/tests/script/general/stream/table_replica1_vnoden.sim @@ -196,8 +196,8 @@ $st = $stPrefix . as sql create table $st as select count(tbcol) as c from $tb interval(1d) print =============== step16 -print sleep 22 seconds -sleep 22000 +print sleep 120 seconds +sleep 120000 print =============== step17 $st = $stPrefix . c1