diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d7f1881209a78ee6f00c3e0bb2528f5eb6bf4bdd..e92bb5e64e604f11e39c85c2fceb9bc7357a9f55 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -404,6 +404,7 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void *param, void **taos); void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ; +int doAsyncParseSql(SSqlObj* pSql, const char* sqlstr, size_t sqlLen); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index ebcdddffdee3f4be6e70048c2d0e170a9c38e1f2..bd2dc64ef0b221c3f8491501951507bf67263c52 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -40,30 +40,23 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); -void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - pSql->signature = pSql; - pSql->param = param; - pSql->pTscObj = pObj; - pSql->maxRetry = TSDB_MAX_REPLICA_NUM; - pSql->fp = fp; - - sem_init(&pSql->rspSem, 0, 0); - if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { +int doAsyncParseSql(SSqlObj* pSql, const char* sqlstr, size_t sqlLen) { + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); + if (code != TSDB_CODE_SUCCESS) { tscError("failed to malloc payload"); - tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); - return; + tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_CLI_OUT_OF_MEMORY); + return code; } // todo check for OOM problem pSql->sqlstr = calloc(1, sqlLen + 1); if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); - tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); + tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_CLI_OUT_OF_MEMORY); free(pCmd->payload); - return; + return TSDB_CODE_CLI_OUT_OF_MEMORY; } pRes->qhandle = 0; @@ -72,7 +65,17 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const strtolower(pSql->sqlstr, sqlstr); tscDump("%p SQL: %s", pSql, pSql->sqlstr); - int32_t code = tsParseSql(pSql, true); + return tsParseSql(pSql, true); +} + +void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { + pSql->signature = pSql; + pSql->param = param; + pSql->pTscObj = pObj; + pSql->maxRetry = TSDB_MAX_REPLICA_NUM; + pSql->fp = fp; + + int32_t code = doAsyncParseSql(pSql, sqlstr, sqlLen); if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 83a097f2f57793d062427df6d19f92ca84eadf46..028ae0592a7a2d6898151ef844d579b3b0b58264 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -497,46 +497,18 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p } pSql->signature = pSql; + pSql->param = pSql; pSql->pTscObj = pObj; + pSql->fp = asyncCallback; + SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - int ret = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); - if (TSDB_CODE_SUCCESS != ret) { - setErrorInfo(pSql, ret, NULL); - free(pSql); - return NULL; - } - - pSql->sqlstr = strdup(sqlstr); - if (pSql->sqlstr == NULL) { - setErrorInfo(pSql, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL); - - tfree(pSql); - return NULL; - } tsem_init(&pSql->rspSem, 0, 0); - - SSqlInfo SQLInfo = {0}; - tSQLParse(&SQLInfo, pSql->sqlstr); - - tscResetSqlCmdObj(&pSql->cmd); - ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE); - if (TSDB_CODE_SUCCESS != ret) { - setErrorInfo(pSql, ret, NULL); - tscError("%p open stream failed, sql:%s, code:%d", pSql, sqlstr, TSDB_CODE_CLI_OUT_OF_MEMORY); - tscFreeSqlObj(pSql); - return NULL; - } - - pSql->param = pSql; - pSql->fp = asyncCallback; - pRes->code = tscToSQLCmd(pSql, &SQLInfo); - if (pRes->code == TSDB_CODE_ACTION_IN_PROGRESS) { + int32_t code = doAsyncParseSql(pSql, sqlstr, strlen(sqlstr)); + if (code == TSDB_CODE_ACTION_IN_PROGRESS) { sem_wait(&pSql->rspSem); } - - SQLInfoDestroy(&SQLInfo); if (pRes->code != TSDB_CODE_SUCCESS) { setErrorInfo(pSql, pRes->code, pCmd->payload); @@ -575,6 +547,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, stime); int64_t starttime = tscGetLaunchTimestamp(pStream); + pCmd->command = TSDB_SQL_SELECT; taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer); tscTrace("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 401d61b7a25b737de6ab5db3256f7a50b993f766..5c936388dd19e1e67b4a0ad52c7bc528e57a7405 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -75,7 +75,14 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { strcpy(pContext->user, pCfg->user); strcpy(pContext->pass, pCfg->pass); - strcpy(pContext->db, pCfg->db); + const char* db = pCfg->db; + for (const char* p = db; *p != 0; p++) { + if (*p == '.') { + db = p + 1; + break; + } + } + strcpy(pContext->db, db); pContext->vgId = pCfg->vgId; pContext->cqWrite = pCfg->cqWrite; pContext->ahandle = ahandle; diff --git a/tests/pytest/stream/stream1.py b/tests/pytest/stream/stream1.py new file mode 100644 index 0000000000000000000000000000000000000000..7a9d88da3b2aa7bedae0824cac89349dcbeb5040 --- /dev/null +++ b/tests/pytest/stream/stream1.py @@ -0,0 +1,131 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import taos +from util.log import tdLog +from util.cases import tdCases +from util.sql import tdSql + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + tbNum = 10 + rowNum = 20 + + tdSql.prepare() + + tdLog.info("===== step1 =====") + tdSql.execute("create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)") + for i in range(tbNum): + tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i)) + for j in range(rowNum): + tdSql.execute("insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j)) + time.sleep(0.1) + + tdLog.info("===== step2 =====") + tdSql.query("select count(*), count(col1), count(col2) from tb0 interval(1d)") + tdSql.checkData(0, 1, rowNum) + tdSql.checkData(0, 2, rowNum) + tdSql.checkData(0, 3, rowNum) + tdSql.query("show tables") + tdSql.checkRows(tbNum) + tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)") + tdSql.query("show tables") + tdSql.checkRows(tbNum + 1) + + tdLog.info("===== step3 =====") + tdLog.info("sleeping 120 seconds") + time.sleep(120) + tdSql.query("select * from s0") + tdSql.checkData(0, 1, rowNum) + tdSql.checkData(0, 2, rowNum) + tdSql.checkData(0, 3, rowNum) + + tdLog.info("===== step4 =====") + tdSql.execute("drop table s0") + tdSql.query("show tables") + tdSql.checkRows(tbNum) + + tdLog.info("===== step5 =====") + tdSql.error("select * from s0") + + tdLog.info("===== step6 =====") + time.sleep(0.1) + tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)") + tdSql.query("show tables") + tdSql.checkRows(tbNum + 1) + + tdLog.info("===== step7 =====") + tdLog.info("sleeping 120 seconds") + time.sleep(120) + + tdSql.query("select * from s0") + tdSql.checkData(0, 1, rowNum) + tdSql.checkData(0, 2, rowNum) + tdSql.checkData(0, 3, rowNum) + + tdLog.info("===== step8 =====") + tdSql.query("select count(*), count(col1), count(col2) from stb0 interval(1d)") + tdSql.checkData(0, 1, rowNum * tbNum) + tdSql.checkData(0, 2, rowNum * tbNum) + tdSql.checkData(0, 3, rowNum * tbNum) + tdSql.query("show tables") + tdSql.checkRows(tbNum + 1) + + tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)") + tdSql.query("show tables") + tdSql.checkRows(tbNum + 2) + + tdLog.info("===== step9 =====") + tdLog.info("sleeping 120 seconds") + time.sleep(120) + + tdSql.query("select * from s1") + tdSql.checkData(0, 1, rowNum * tbNum) + tdSql.checkData(0, 2, rowNum * tbNum) + tdSql.checkData(0, 3, rowNum * tbNum) + + tdLog.info("===== step10 =====") + tdSql.execute("drop table s1") + tdSql.query("show tables") + tdSql.checkRows(tbNum + 1) + + tdLog.info("===== step11 =====") + tdSql.error("select * from s1") + + tdLog.info("===== step12 =====") + tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)") + tdSql.query("show tables") + tdSql.checkRows(tbNum + 2) + + tdLog.info("===== step13 =====") + tdLog.info("sleeping 120 seconds") + time.sleep(120) + tdSql.query("select * from s1") + tdSql.checkData(0, 1, rowNum * tbNum) + tdSql.checkData(0, 2, rowNum * tbNum) + tdSql.checkData(0, 3, rowNum * tbNum) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())