提交 51de20d2 编写于 作者: B Bomin Zhang

td-449: more cases passedcurrently, only below four cases fail:*...

td-449: more cases passedcurrently, only below four cases fail:* agg_stream.sim ('us' ?)* column_stream.sim ('us' ?)* table_replica1_vnoden.sim (crash)* new_stream.sim (failed at line 101)
上级 81ea4a8a
......@@ -404,7 +404,7 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void *param, void **taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
int doAsyncParseSql(SSqlObj* pSql, const char* sqlstr, size_t sqlLen);
int doAsyncParseSql(SSqlObj* pSql);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
......
......@@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
int doAsyncParseSql(SSqlObj* pSql, const char* sqlstr, size_t sqlLen) {
int doAsyncParseSql(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
......@@ -50,21 +50,10 @@ int doAsyncParseSql(SSqlObj* pSql, const char* sqlstr, size_t sqlLen) {
return code;
}
// todo check for OOM problem
pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_CLI_OUT_OF_MEMORY);
free(pCmd->payload);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
pRes->qhandle = 0;
pRes->numOfRows = 1;
strtolower(pSql->sqlstr, sqlstr);
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
return tsParseSql(pSql, true);
}
......@@ -74,8 +63,15 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->fp = fp;
pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_CLI_OUT_OF_MEMORY);
return;
}
strtolower(pSql->sqlstr, sqlstr);
int32_t code = doAsyncParseSql(pSql, sqlstr, sqlLen);
int32_t code = doAsyncParseSql(pSql);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) {
......@@ -521,15 +517,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pSql->pStream) {
tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
/*
* NOTE:
* transfer the sql function for super table query before get meter/metric meta,
* since in callback functions, only tscProcessSql(pStream->pSql) is executed!
*/
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscTansformSQLFuncForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream);
tsParseSql(pSql, false);
sem_post(&pSql->rspSem);
return;
} else {
tscTrace("%p get tableMeta successfully", pSql);
}
......
......@@ -99,7 +99,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
}
tscTrace("%p stream:%p start stream query on:%s", pSql, pStream, pTableMetaInfo->name);
tscProcessSql(pStream->pSql);
tscDoQuery(pStream->pSql);
tscIncStreamExecutionCount(pStream);
}
......@@ -477,14 +477,6 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) {
}
}
static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param);
pSql->res.code = code;
sem_post(&pSql->rspSem);
}
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos;
......@@ -492,20 +484,34 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
setErrorInfo(pSql, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
return NULL;
}
pSql->signature = pSql;
pSql->param = pSql;
pSql->pTscObj = pObj;
pSql->fp = asyncCallback;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) {
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL;
}
pSql->pStream = pStream;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscFreeSqlObj(pSql);
return NULL;;
}
strtolower(pSql->sqlstr, sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
int32_t code = doAsyncParseSql(pSql, sqlstr, strlen(sqlstr));
int32_t code = doAsyncParseSql(pSql);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
sem_wait(&pSql->rspSem);
}
......@@ -518,15 +524,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return NULL;
}
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) {
setErrorInfo(pSql, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -540,7 +537,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pStream->ctime = taosGetTimestamp(pStream->precision);
pStream->etime = pQueryInfo->window.ekey;
pSql->pStream = pStream;
tscAddIntoStreamList(pStream);
tscSetSlidingWindowInfo(pSql, pStream);
......
......@@ -145,6 +145,7 @@ void tdFreeDataRow(SDataRow row);
void tdInitDataRow(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row);
// offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) {
ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
......
......@@ -246,13 +246,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
int32_t flen = 0;
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
flen += TYPE_BYTES[pSchema->columns[i].type];
}
// construct data
int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + flen;
int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
char *buffer = calloc(size, 1);
SWalHead *pHead = (SWalHead *)buffer;
......@@ -260,12 +254,15 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
SDataRow trow = (SDataRow)pBlk->data;
dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
tdInitDataRow(trow, pSchema);
int toffset = 0;
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
tdAppendColVal(trow, row[i], pSchema->columns[i].type, pSchema->columns[i].bytes, toffset);
toffset += TYPE_BYTES[pSchema->columns[i].type];
STColumn *c = pSchema->columns + i;
char* val = (char*)row[i];
if (IS_VAR_DATA_TYPE(c->type)) {
val -= sizeof(VarDataLenT);
}
tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
}
pBlk->len = htonl(dataRowLen(trow));
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tbNum = 10
rowNum = 20
totalNum = tbNum * rowNum
tdSql.prepare()
tdLog.info("===== step1 =====")
tdSql.execute("create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
for i in range(tbNum):
tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i))
for j in range(rowNum):
tdSql.execute("insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j))
time.sleep(0.1)
tdLog.info("===== step2 =====")
tdSql.query("select count(col1) from tb0 interval(1d)")
tdSql.checkData(0, 1, rowNum)
tdSql.query("show tables")
tdSql.checkRows(tbNum)
tdSql.execute("create table s0 as select count(col1) from tb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step3 =====")
time.sleep(120)
tdSql.query("select * from s0")
tdSql.checkData(0, 1, rowNum)
tdLog.info("===== step4 =====")
tdSql.execute("drop table s0")
tdSql.query("show tables")
tdSql.checkRows(tbNum)
tdLog.info("===== step5 =====")
tdSql.error("select * from s0")
tdLog.info("===== step6 =====")
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step7 =====")
time.sleep(120)
tdSql.query("select * from s0")
tdSql.checkData(0, 1, rowNum)
tdSql.checkData(0, 2, rowNum)
tdSql.checkData(0, 3, rowNum)
tdLog.info("===== step8 =====")
tdSql.query("select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.checkData(0, 1, totalNum)
tdSql.checkData(0, 2, totalNum)
tdSql.checkData(0, 3, totalNum)
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 2)
tdLog.info("===== step9 =====")
time.sleep(120)
tdSql.query("select * from s1")
tdSql.checkData(0, 1, totalNum)
tdSql.checkData(0, 2, totalNum)
tdSql.checkData(0, 3, totalNum)
tdLog.info("===== step10 =====")
tdSql.execute("drop table s1")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step11 =====")
tdSql.error("select * from s1")
tdLog.info("===== step12 =====")
tdSql.execute("create table s1 as select count(col1) from stb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 2)
tdLog.info("===== step13 =====")
time.sleep(120)
tdSql.query("select * from s1")
tdSql.checkData(0, 1, totalNum)
#tdSql.checkData(0, 2, None)
#tdSql.checkData(0, 3, None)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -205,8 +205,8 @@ if $data01 != 20 then
endi
print =============== step21
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
print =============== step22
$st = $stPrefix . c1
......
......@@ -76,20 +76,20 @@ endw
sql drop table $mt
print =============== step4
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
print =============== step5
$st = $stPrefix . c3
sql select * from $st
print ===> select * from $st
print ===> $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
if $data01 != NULL then
if $data01 != null then
return -1
endi
if $data02 != NULL then
if $data02 != null then
return -1
endi
if $data03 != NULL then
if $data03 != null then
return -1
endi
......@@ -187,8 +187,8 @@ $st = $stPrefix . as
#sql create table $st as select avg(tbcol) as a1, sum(tbcol) as a2, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, count(tbcol) as a7, avg(tbcol) as a8, sum(tbcol) as a9, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, count(tbcol) as a7 from $mt where ts < now + 4m interval(1d)
print =============== step9
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
print =============== step10
$st = $stPrefix . c3
......
......@@ -163,8 +163,8 @@ $st = $stPrefix . as
sql create table $st as select count(tbcol) as c from $mt interval(1d)
print =============== step13
print sleep 22 seconds
sleep 32000
print sleep 120 seconds
sleep 120000
print =============== step14
$st = $stPrefix . c1
......
......@@ -73,8 +73,8 @@ print =============== step3
sql create table $stt as select count(*) from $tb interval(1d)
sql create table $stm as select count(*) from $mt interval(1d)
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
sql select * from $stt
print select count(*) from $stt ===> $data00 $data01
......@@ -152,8 +152,8 @@ print =============== step8
sql create table $stt as select count(*) from $tb interval(1d)
sql create table $stm as select count(*) from $mt interval(1d)
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
sql select * from $stt
sleep 1000
......
......@@ -78,8 +78,8 @@ if $rows != 11 then
endi
print =============== step3
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
sql select * from $st
print select * from $st => $data01
if $data01 != 20 then
......@@ -112,8 +112,8 @@ if $rows != 11 then
endi
print =============== step7
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
sql select * from $st
print select * from $st => $data01
if $data01 != 20 then
......@@ -155,8 +155,8 @@ if $rows != 12 then
endi
print =============== step9
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
sql select * from $st
print select * from $st => $data01
if $data01 != 200 then
......@@ -190,8 +190,8 @@ if $rows != 12 then
endi
print =============== step13
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
sql select * from $st
print select * from $st => $data01
if $data01 != 200 then
......
......@@ -72,8 +72,8 @@ if $rows != 11 then
endi
print =============== step3
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
sql select * from $st
print select * from $st => $data01
if $data01 != 20 then
......@@ -100,8 +100,8 @@ if $rows != 11 then
endi
print =============== step7
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
sql select * from $st
print select * from $st => $data01
if $data01 != 20 then
......@@ -143,8 +143,8 @@ if $rows != 12 then
endi
print =============== step9
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
sql select * from $st
print select * from $st => $data01 $data02, $data03
if $data01 != 200 then
......@@ -178,17 +178,17 @@ if $rows != 12 then
endi
print =============== step13
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
sql select * from $st
print select * from $st => $data01 $data02, $data03
if $data01 != 200 then
return -1
endi
if $data02 != NULL then
if $data02 != null then
return -1
endi
if $data03 != NULL then
if $data03 != null then
return -1
endi
......@@ -79,8 +79,8 @@ $st = $stPrefix . c3
sql create table $st as select count(tbcol2) from $tb interval(1d)
print =============== step5
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
print =============== step6
$st = $stPrefix . c1
......@@ -173,8 +173,8 @@ $st = $stPrefix . c3
sql create table $st as select count(*), count(tbcol), count(tbcol2) from $tb interval(1d)
print =============== step10
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
print =============== step11
#$st = $stPrefix . c3
......
......@@ -79,8 +79,8 @@ sleep 1000
system sh/exec.sh -n dnode1 -s start
print =============== step4
print sleep 23 seconds
sleep 23000
print sleep 120 seconds
sleep 120000
print =============== step5
$i = 1
......
......@@ -214,8 +214,8 @@ sql select count(tbcol) from $tb where ts < now + 4m interval(1d) group by tgcol
step20:
print =============== step21
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
print =============== step22
$st = $stPrefix . c1
......
......@@ -71,20 +71,20 @@ print =============== step3
sql drop table $tb
print =============== step4
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
print =============== step5
$st = $stPrefix . c3
sql select * from $st
print ===> select * from $st
print ===> $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
if $data01 != NULL then
if $data01 != null then
return -1
endi
if $data02 != NULL then
if $data02 != null then
return -1
endi
if $data03 != NULL then
if $data03 != null then
return -1
endi
......@@ -191,8 +191,8 @@ $st = $stPrefix . as
#sql create table $st as select avg(tbcol) as a1, sum(tbcol) as a2, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, stddev(tbcol) as a7, percentile(tbcol, 1) as a8, count(tbcol) as a9, leastsquares(tbcol, 1, 1) as a10 from $tb where ts < now + 4m interval(1d)
print =============== step10
print sleep 22 seconds
sleep 22000
print sleep 120 seconds
sleep 120000
print =============== step11
$st = $stPrefix . c3
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册