未验证 提交 6c2e19c0 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2178 from taosdata/feature/query

Feature/query
......@@ -191,14 +191,14 @@ typedef struct SDataBlockList { // todo remove
} SDataBlockList;
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert/import type
char slidingTimeUnit;
STimeWindow window;
int64_t intervalTime; // aggregation time interval
int64_t slidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert/import type
char slidingTimeUnit;
STimeWindow window;
int64_t intervalTime; // aggregation time interval
int64_t slidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
......@@ -207,11 +207,11 @@ typedef struct SQueryInfo {
SLimitVal slimit;
STagCond tagCond;
SOrderVal order;
int16_t fillType; // interpolate type
int16_t fillType; // final result fill type
int16_t numOfTables;
STableMetaInfo **pTableMetaInfo;
struct STSBuf * tsBuf;
int64_t * fillVal; // default value for interpolation
int64_t * fillVal; // default value for fill
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
......@@ -222,15 +222,15 @@ typedef struct SQueryInfo {
typedef struct {
int command;
uint8_t msgType;
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
int8_t dataSourceType; // load data from file or not
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
int8_t dataSourceType; // load data from file or not
union {
int32_t count;
int32_t numOfTablesInSubmit;
};
int32_t insertType;
int32_t clauseIndex; // index of multiple subclause query
int8_t parseFinished;
short numOfCols;
......@@ -239,14 +239,12 @@ typedef struct {
int32_t payloadLen;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
char * curSql; // current sql, resume position of sql after parsing paused
void * pTableList; // referred table involved in sql
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
SDataBlockList *pDataBlocks; // submit data blocks after parsing sql
char * curSql; // current sql, resume position of sql after parsing paused
void * pTableList; // referred table involved in sql
// for parameter ('?') binding and batch processing
int32_t batchSize;
int32_t numOfParams;
} SSqlCmd;
typedef struct SResRec {
......@@ -316,7 +314,6 @@ typedef struct SSqlObj {
SRpcIpSet ipList;
char freed : 4;
char listed : 4;
uint32_t insertType;
tsem_t rspSem;
SSqlCmd cmd;
SSqlRes res;
......@@ -361,7 +358,7 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet);
int tscProcessSql(SSqlObj *pSql);
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
int tscRenewTableMeta(SSqlObj *pSql, char *tableId);
void tscQueueAsyncRes(SSqlObj *pSql);
void tscQueueAsyncError(void(*fp), void *param, int32_t code);
......
......@@ -442,15 +442,17 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
if (pSql->pStream == NULL) {
// check if it is a sub-query of super table query first, if true, enter another routine
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
// check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) {
tscTrace("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo->pTableMeta == NULL){
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_SUCCESS);
}
}
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
......@@ -460,32 +462,37 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
tscTrace("%p get metricMeta during super table query successfully", pSql);
code = tscGetSTableVgroupInfo(pSql, 0);
pRes->code = code;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
} else { // normal async query continues
// NOTE: the vgroupInfo for the queried super table must be existed here.
assert(pTableMetaInfo->vgroupList != NULL);
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
return;
}
} else { // continue to process normal async query
if (pCmd->parseFinished) {
tscTrace("%p re-send data to vnode in table Meta callback since sql parsed completed", pSql);
tscTrace("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_SUCCESS);
if (pTableMetaInfo->pTableMeta) {
// todo update the submit message according to the new table meta
// 1. table uid, 2. ip address
code = tscSendMsgToServer(pSql);
if (code == TSDB_CODE_SUCCESS) return;
// if failed to process sql, go to error handler
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
return;
}
// // todo update the submit message according to the new table meta
// // 1. table uid, 2. ip address
// code = tscSendMsgToServer(pSql);
// if (code == TSDB_CODE_SUCCESS) return;
// }
} else {
tscTrace("%p continue parse sql after get table meta", pSql);
code = tsParseSql(pSql, false);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STMT_INSERT) == TSDB_QUERY_TYPE_STMT_INSERT) {
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_SUCCESS && pTableMetaInfo->pTableMeta != NULL);
(*pSql->fp)(pSql->param, pSql, code);
return;
}
......
......@@ -1293,7 +1293,7 @@ static void max_function_f(SQLFunctionCtx *pCtx, int32_t index) {
minMax_function_f(pCtx, index, 0);
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult == DATA_SET_FLAG) {
if (pResInfo->hasResult == DATA_SET_FLAG && pResInfo->superTableQ) {
char *flag = pCtx->aOutputBuf + pCtx->inputBytes;
*flag = DATA_SET_FLAG;
}
......@@ -1309,7 +1309,7 @@ static void min_function_f(SQLFunctionCtx *pCtx, int32_t index) {
minMax_function_f(pCtx, index, 1);
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult == DATA_SET_FLAG) {
if (pResInfo->hasResult == DATA_SET_FLAG && pResInfo->superTableQ) {
char *flag = pCtx->aOutputBuf + pCtx->inputBytes;
*flag = DATA_SET_FLAG;
}
......
......@@ -1314,7 +1314,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, pSql->insertType);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, pCmd->insertType);
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
if (sToken.type != TK_INTO) {
......@@ -1342,7 +1342,7 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
* Set the fp before parse the sql string, in case of getTableMeta failed, in which
* the error handle callback function can rightfully restore the user-defined callback function (fp).
*/
if (initialParse && (pSql->insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
if (initialParse && (pSql->cmd.insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
pSql->fetchFp = pSql->fp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
}
......@@ -1354,9 +1354,7 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
return ret;
}
SSqlInfo SQLInfo = {0};
tSQLParse(&SQLInfo, pSql->sqlstr);
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
ret = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo);
}
......
......@@ -451,7 +451,7 @@ static int insertStmtExecute(STscStmt* stmt) {
pRes->qhandle = 0;
pSql->insertType = 0;
pSql->cmd.insertType = 0;
pSql->fetchFp = waitForQueryRsp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
......@@ -515,7 +515,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
SSqlRes *pRes = &pSql->res;
pSql->param = (void*) pSql;
pSql->fp = waitForQueryRsp;
pSql->insertType = TSDB_QUERY_TYPE_STMT_INSERT;
pSql->cmd.insertType = TSDB_QUERY_TYPE_STMT_INSERT;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("%p failed to malloc payload buffer", pSql);
......
......@@ -515,7 +515,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (ret != 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
}
}
pCmd->parseFinished = 1;
return TSDB_CODE_SUCCESS; // do not build query message here
......@@ -543,6 +543,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression");
}
pSql->cmd.parseFinished = true;
return tscBuildMsg[pCmd->command](pSql, pInfo);
}
......
......@@ -1185,7 +1185,9 @@ bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage
int32_t ret = 0; // merge all result by default
int16_t functionId = pLocalReducer->pCtx[0].functionId;
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) { // column projection query
// todo opt performance
if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0))) { // column projection query
ret = 1; // disable merge procedure
} else {
tOrderDescriptor *pDesc = pLocalReducer->pDesc;
......
......@@ -239,16 +239,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
/*
* not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
* the virtual node may have not create table till now, so try again by using the new metermeta.
* 2. this requested table may have been removed by other client, so we need to renew the
* metermeta here.
*
* not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
* removed. So, renew metermeta and try again.
* not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
*/
if (pCmd->command == TSDB_SQL_CONNECT) {
rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcFreeCont(rpcMsg->pCont);
......@@ -258,8 +248,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
rpcFreeCont(rpcMsg->pCont);
return;
} else if (pCmd->command == TSDB_SQL_META) {
// rpcFreeCont(rpcMsg->pCont);
// return;
// get table meta query will not retry, do nothing
} else {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
......@@ -267,13 +256,14 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
} else {
rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
if (pTableMetaInfo->pTableMeta) {
tscSendMsgToServer(pSql);
rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);
// if there is an error occurring, proceed to the following error handling procedure.
// todo add test cases
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
rpcFreeCont(rpcMsg->pCont);
return;
}
rpcFreeCont(rpcMsg->pCont);
return;
}
}
}
......@@ -330,9 +320,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
}
}
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
}
if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? pRes->numOfRows: pRes->code;
......@@ -431,7 +421,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
/*
* here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
* sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
*/
pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
//taosStopRpcConn(pSql->pSubs[i]->thandle);
......@@ -565,7 +555,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
pQueryMsg->numOfTables = htonl(1); // set the number of tables
pMsg += sizeof(STableIdInfo);
} else {
} else { // it is a subquery of the super table query, this IP info is acquired from vgroupInfo
int32_t index = pTableMetaInfo->vgroupIndex;
int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(index >= 0 && index < numOfVgroups);
......@@ -1822,7 +1812,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tscTrace("%p recv table meta: %"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
tscTrace("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
free(pTableMeta);
return TSDB_CODE_SUCCESS;
......@@ -2358,7 +2348,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
int32_t code = tscProcessSql(pNew);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated
}
return code;
......@@ -2389,56 +2379,26 @@ int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
return tscGetTableMeta(pSql, pTableMetaInfo);
}
/*
* in handling the renew metermeta problem during insertion,
*
* If the meter is created on demand during insertion, the routine usually waits for a short
* period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
* successfully created the corresponding table.
*/
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
if (pCmd->command == TSDB_SQL_INSERT) {
taosMsleep(50); // todo: global config
}
}
/**
* in renew metermeta, do not retrieve metadata in cache.
* retrieve table meta from mnode, and update the local table meta cache.
* @param pSql sql object
* @param tableId meter id
* @param tableId table full name
* @return status code
*/
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
int code = 0;
// handle table meta renew process
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
/*
* 1. only update the metermeta in force model metricmeta is not updated
* 2. if get metermeta failed, still get the metermeta
*/
if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnSTable(pCmd)) {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMetaInfo->pTableMeta) {
tscTrace("%p update table meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
}
tscWaitingForCreateTable(pCmd);
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
code = getTableMetaFromMgmt(pSql, pTableMetaInfo); // todo ??
} else {
tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
pTableMetaInfo->pTableMeta);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMetaInfo->pTableMeta) {
tscTrace("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
}
return code;
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
return getTableMetaFromMgmt(pSql, pTableMetaInfo);
}
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
......
......@@ -322,7 +322,7 @@ enum {
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql);
SSqlInfo qSQLParse(const char *str);
#ifdef __cplusplus
}
......
......@@ -373,7 +373,6 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
SPosInfo pos = {-1, -1};
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos);
}
pWindowResInfo->capacity = newCap;
}
......@@ -1566,11 +1565,6 @@ static bool isFirstLastRowQuery(SQuery *pQuery) {
return false;
}
static UNUSED_FUNC bool notHasQueryTimeRange(SQuery *pQuery) {
return (pQuery->window.skey == 0 && pQuery->window.ekey == INT64_MAX && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->window.skey == INT64_MAX && pQuery->window.ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery)));
}
static bool needReverseScan(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
......@@ -1768,61 +1762,6 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) {
}
}
static UNUSED_FUNC void doSetInterpVal(SQLFunctionCtx *pCtx, TSKEY ts, int16_t type, int32_t index, char *data) {
assert(pCtx->param[index].pz == NULL);
int32_t len = 0;
size_t t = 0;
if (type == TSDB_DATA_TYPE_BINARY) {
t = strlen(data);
len = t + 1 + TSDB_KEYSIZE;
pCtx->param[index].pz = calloc(1, len);
} else if (type == TSDB_DATA_TYPE_NCHAR) {
t = wcslen((const wchar_t *)data);
len = (t + 1) * TSDB_NCHAR_SIZE + TSDB_KEYSIZE;
pCtx->param[index].pz = calloc(1, len);
} else {
len = TSDB_KEYSIZE * 2;
pCtx->param[index].pz = malloc(len);
}
pCtx->param[index].nType = TSDB_DATA_TYPE_BINARY;
char *z = pCtx->param[index].pz;
*(TSKEY *)z = ts;
z += TSDB_KEYSIZE;
switch (type) {
case TSDB_DATA_TYPE_FLOAT:
*(double *)z = GET_FLOAT_VAL(data);
break;
case TSDB_DATA_TYPE_DOUBLE:
*(double *)z = GET_DOUBLE_VAL(data);
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TIMESTAMP:
*(int64_t *)z = GET_INT64_VAL(data);
break;
case TSDB_DATA_TYPE_BINARY:
strncpy(z, data, t);
break;
case TSDB_DATA_TYPE_NCHAR: {
wcsncpy((wchar_t *)z, (const wchar_t *)data, t);
} break;
default:
assert(0);
}
pCtx->param[index].nLen = len;
}
static int32_t getInitialPageNum(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t INITIAL_RESULT_ROWS_VALUE = 16;
......@@ -4071,43 +4010,17 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type);
}
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
/*
* in case of last_row query without query range, we set the query timestamp to be
* STable->lastKey. Otherwise, keep the initial query time range unchanged.
*/
// if (isFirstLastRowQuery(pQuery)) {
// if (!normalizeUnBoundLastRowQuery(pQInfo, &interpInfo)) {
// sem_post(&pQInfo->dataReady);
// pointInterpSupporterDestroy(&interpInfo);
// return TSDB_CODE_SUCCESS;
// }
// }
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery);
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput,
pQuery->slidingTime, pQuery->fillType, pColInfo);
}
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
return TSDB_CODE_SUCCESS;
}
static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STableGroupInfo *pSidset) {
if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) {
return false;
}
for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i);
if (pColIndex->flag == TSDB_COL_TAG) {
return true;
}
}
// todo refactor
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
return false;
return TSDB_CODE_SUCCESS;
}
static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
......@@ -5907,10 +5820,11 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
_over:
tfree(tagCond);
tfree(tbnameCond);
tfree(pGroupColIndex);
taosArrayDestroy(pTableIdList);
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
if (code != TSDB_CODE_SUCCESS) {
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
*pQInfo = NULL;
}
......
......@@ -26,16 +26,18 @@
#include "tstrbuild.h"
#include "queryLog.h"
int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pStr) {
SSqlInfo qSQLParse(const char *pStr) {
void *pParser = ParseAlloc(malloc);
pSQLInfo->valid = true;
SSqlInfo sqlInfo = {0};
sqlInfo.valid = true;
int32_t i = 0;
while (1) {
SSQLToken t0 = {0};
if (pStr[i] == 0) {
Parse(pParser, 0, t0, pSQLInfo);
Parse(pParser, 0, t0, &sqlInfo);
goto abort_parse;
}
......@@ -49,19 +51,19 @@ int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pStr) {
break;
}
case TK_SEMI: {
Parse(pParser, 0, t0, pSQLInfo);
Parse(pParser, 0, t0, &sqlInfo);
goto abort_parse;
}
case TK_QUESTION:
case TK_ILLEGAL: {
snprintf(pSQLInfo->pzErrMsg, tListLen(pSQLInfo->pzErrMsg), "unrecognized token: \"%s\"", t0.z);
pSQLInfo->valid = false;
snprintf(sqlInfo.pzErrMsg, tListLen(sqlInfo.pzErrMsg), "unrecognized token: \"%s\"", t0.z);
sqlInfo.valid = false;
goto abort_parse;
}
default:
Parse(pParser, t0.type, t0, pSQLInfo);
if (pSQLInfo->valid == false) {
Parse(pParser, t0.type, t0, &sqlInfo);
if (sqlInfo.valid == false) {
goto abort_parse;
}
}
......@@ -69,7 +71,7 @@ int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pStr) {
abort_parse:
ParseFree(pParser, free);
return 0;
return sqlInfo;
}
tSQLExprList *tSQLExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SSQLToken *pToken) {
......
......@@ -21,14 +21,16 @@
#include <stdlib.h>
#include <string.h>
#include <taos.h> // TAOS header file
#include <unistd.h>
void taosMsleep(int mseconds);
#include <sys/time.h>
#include <inttypes.h>
static int32_t doQuery(TAOS* taos, const char* sql) {
struct timeval t1 = {0};
gettimeofday(&t1, NULL);
TAOS_RES* res = taos_query(taos, sql);
if (taos_errno(res) != 0) {
printf("failed to execute query, reason:%s\n", taos_errstr(taos));
printf("failed to execute query, reason:%s\n", taos_errstr(res));
return -1;
}
......@@ -38,13 +40,19 @@ static int32_t doQuery(TAOS* taos, const char* sql) {
int32_t numOfFields = taos_num_fields(res);
TAOS_FIELD* pFields = taos_fetch_fields(res);
int32_t i = 0;
while((row = taos_fetch_row(res)) != NULL) {
taos_print_row(buf, row, pFields, numOfFields);
printf("%s\n", buf);
printf("%d:%s\n", ++i, buf);
memset(buf, 0, 512);
}
taos_free_result(res);
struct timeval t2 = {0};
gettimeofday(&t2, NULL);
printf("elapsed time:%"PRId64 " ms\n", ((t2.tv_sec*1000000 + t2.tv_usec) - (t1.tv_sec*1000000 + t1.tv_usec))/1000);
return 0;
}
......@@ -101,14 +109,18 @@ int main(int argc, char *argv[]) {
taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", taos_errstr(taos));
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
exit(1);
}
printf("success to connect to server\n");
printf("success to connect to server\n");
// doQuery(taos, "select c1,count(*) from group_db0.group_mt0 where c1<8 group by c1");
doQuery(taos, "select * from test.m1");
// multiThreadTest(1, taos);
doQuery(taos, "use test");
doQuery(taos, "alter table tm99 set tag a=99");
// doQuery(taos, "select tbname from test.m1");
// doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0') interval(1s) group by t1");
// doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0', 'lm2_tb1', 'lm2_tb2') interval(1s)");
// for(int32_t i = 0; i < 100000; ++i) {
// doQuery(taos, "insert into t1 values(now, 2)");
// }
......
......@@ -180,7 +180,7 @@ if $data03 != 0 then
endi
print $data04
if $data04 != 0.0000 then
if $data04 != 0.00000 then
return -1
endi
......@@ -201,7 +201,8 @@ if $data13 != 1 then
return -1
endi
if $data14 != 1.0000 then
if $data14 != 1.00000 then
print expect 1.00000, actual:$data14
return -1
endi
......@@ -345,6 +346,19 @@ if $data94 != 9 then
return -1
endi
sql select c1,sum(c1),avg(c1),count(*) from group_mt0 where c1<5 group by c1;
if $row != 5 then
return -1
endi
if $data00 != 0 then
return -1
endi
if $data01 != 800 then
return -1
endi
sql select first(c1), last(ts), first(ts), last(c1),sum(c1),avg(c1),count(*) from group_mt0 where c1<20 group by tbname,c1;
if $row != 160 then
return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册