未验证 提交 ac5789d9 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #6320 from taosdata/feature/query

Feature/query
...@@ -1204,7 +1204,6 @@ static int insertBatchStmtExecute(STscStmt* pStmt) { ...@@ -1204,7 +1204,6 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
return pStmt->pSql->res.code; return pStmt->pSql->res.code;
} }
int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) { int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
...@@ -1234,28 +1233,28 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) { ...@@ -1234,28 +1233,28 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
pStmt->mtb.tagSet = true; pStmt->mtb.tagSet = true;
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false); sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n > 0 && sToken.type == TK_VALUES) { if (sToken.n > 0 && (sToken.type == TK_VALUES || sToken.type == TK_LP)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (sToken.n <= 0 || sToken.type != TK_USING) { if (sToken.n <= 0 || sToken.type != TK_USING) {
return TSDB_CODE_TSC_INVALID_OPERATION; return tscSQLSyntaxErrMsg(pCmd->payload, "keywords USING is expected", sToken.z);
} }
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false); sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n <= 0 || ((sToken.type != TK_ID) && (sToken.type != TK_STRING))) { if (sToken.n <= 0 || ((sToken.type != TK_ID) && (sToken.type != TK_STRING))) {
return TSDB_CODE_TSC_INVALID_OPERATION; return tscSQLSyntaxErrMsg(pCmd->payload, "invalid token", sToken.z);
} }
pStmt->mtb.stbname = sToken; pStmt->mtb.stbname = sToken;
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false); sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_TAGS) { if (sToken.n <= 0 || sToken.type != TK_TAGS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return tscSQLSyntaxErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
} }
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false); sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_LP) { if (sToken.n <= 0 || sToken.type != TK_LP) {
return TSDB_CODE_TSC_INVALID_OPERATION; return tscSQLSyntaxErrMsg(pCmd->payload, ") expected", sToken.z);
} }
pStmt->mtb.tags = taosArrayInit(4, sizeof(SStrToken)); pStmt->mtb.tags = taosArrayInit(4, sizeof(SStrToken));
...@@ -1298,9 +1297,6 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) { ...@@ -1298,9 +1297,6 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAOS_BIND* tags) { int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAOS_BIND* tags) {
size_t tagNum = taosArrayGetSize(pStmt->mtb.tags); size_t tagNum = taosArrayGetSize(pStmt->mtb.tags);
size_t size = 1048576; size_t size = 1048576;
...@@ -1386,8 +1382,6 @@ int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAO ...@@ -1386,8 +1382,6 @@ int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAO
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// interface functions // interface functions
......
...@@ -7482,6 +7482,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7482,6 +7482,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
const char* msg1 = "point interpolation query needs timestamp"; const char* msg1 = "point interpolation query needs timestamp";
const char* msg2 = "too many tables in from clause"; const char* msg2 = "too many tables in from clause";
const char* msg3 = "start(end) time of query range required or time range too large"; const char* msg3 = "start(end) time of query range required or time range too large";
const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column";
const char* msg9 = "only tag query not compatible with normal column filter"; const char* msg9 = "only tag query not compatible with normal column filter";
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -7540,11 +7541,25 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7540,11 +7541,25 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
if (validateIntervalNode(pSql, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) { if (validateIntervalNode(pSql, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} else { } else {
if (isTimeWindowQuery(pQueryInfo) && if (isTimeWindowQuery(pQueryInfo)) {
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) { // check if the first column of the nest query result is timestamp column
SColumn* pCol = taosArrayGetP(pQueryInfo->colList, 0);
if (pCol->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
if (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
} }
}
// set order by info
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMeta)) !=
TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
} else { } else {
pQueryInfo->command = TSDB_SQL_SELECT; pQueryInfo->command = TSDB_SQL_SELECT;
...@@ -7697,8 +7712,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7697,8 +7712,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
SExprInfo** p = NULL; SExprInfo** p = NULL;
int32_t numOfExpr = 0; int32_t numOfExpr = 0;
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = createProjectionExpr(pQueryInfo, pTableMetaInfo, &p, &numOfExpr); code = createProjectionExpr(pQueryInfo, pTableMetaInfo, &p, &numOfExpr);
if (pQueryInfo->exprList1 == NULL) { if (pQueryInfo->exprList1 == NULL) {
pQueryInfo->exprList1 = taosArrayInit(4, POINTER_BYTES); pQueryInfo->exprList1 = taosArrayInit(4, POINTER_BYTES);
} }
......
...@@ -477,7 +477,6 @@ int doBuildAndSendMsg(SSqlObj *pSql) { ...@@ -477,7 +477,6 @@ int doBuildAndSendMsg(SSqlObj *pSql) {
pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_INSERT ||
pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_CONNECT ||
pCmd->command == TSDB_SQL_HB || pCmd->command == TSDB_SQL_HB ||
// pCmd->command == TSDB_SQL_META ||
pCmd->command == TSDB_SQL_STABLEVGROUP) { pCmd->command == TSDB_SQL_STABLEVGROUP) {
pRes->code = tscBuildMsg[pCmd->command](pSql, NULL); pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
} }
......
...@@ -627,7 +627,7 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) { ...@@ -627,7 +627,7 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
char *z = NULL; char *z = NULL;
if (len > 0) { if (len > 0) {
z = strstr(pCmd->payload, "invalid SQL"); z = strstr(pCmd->payload, "invalid operation");
if (z == NULL) { if (z == NULL) {
z = strstr(pCmd->payload, "syntax error"); z = strstr(pCmd->payload, "syntax error");
} }
......
...@@ -780,7 +780,9 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { ...@@ -780,7 +780,9 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SSDataBlock* pBlock = pInput->block; SSDataBlock* pBlock = pInput->block;
if (pOperator->pRuntimeEnv != NULL) {
pOperator->pRuntimeEnv->current = pInput->pTableQueryInfo; pOperator->pRuntimeEnv->current = pInput->pTableQueryInfo;
}
pBlock->info.rows = pRes->numOfRows; pBlock->info.rows = pRes->numOfRows;
if (pRes->numOfRows != 0) { if (pRes->numOfRows != 0) {
...@@ -804,18 +806,9 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { ...@@ -804,18 +806,9 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
return pBlock; return pBlock;
} }
SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) { static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
assert(pOperator->numOfUpstream > 1);
SJoinOperatorInfo* pJoinInfo = pOperator->info; SJoinOperatorInfo* pJoinInfo = pOperator->info;
pJoinInfo->pRes->info.rows = 0;
while(1) {
for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) { for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) {
SJoinStatus* pStatus = &pJoinInfo->status[i]; SJoinStatus* pStatus = &pJoinInfo->status[i];
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) { if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
...@@ -824,12 +817,29 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) { ...@@ -824,12 +817,29 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
if (pStatus->pBlock == NULL) { if (pStatus->pBlock == NULL) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
pJoinInfo->resultInfo.total += pJoinInfo->pRes->info.rows; pJoinInfo->resultInfo.total += pJoinInfo->pRes->info.rows;
return pJoinInfo->pRes; break;
} }
} }
} }
}
SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
assert(pOperator->numOfUpstream > 1);
SJoinOperatorInfo* pJoinInfo = pOperator->info;
pJoinInfo->pRes->info.rows = 0;
while(1) {
fetchNextBlockIfCompleted(pOperator, newgroup);
if (pOperator->status == OP_EXEC_DONE) {
return pJoinInfo->pRes;
}
SJoinStatus* st0 = &pJoinInfo->status[0]; SJoinStatus* st0 = &pJoinInfo->status[0];
SColumnInfoData* p0 = taosArrayGet(st0->pBlock->pDataBlock, 0); SColumnInfoData* p0 = taosArrayGet(st0->pBlock->pDataBlock, 0);
...@@ -847,8 +857,12 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) { ...@@ -847,8 +857,12 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
if (ts[st->index] < ts0[st0->index]) { // less than the first if (ts[st->index] < ts0[st0->index]) { // less than the first
prefixEqual = false; prefixEqual = false;
if ((++(st->index)) >= st->pBlock->info.rows) { if ((++(st->index)) >= st->pBlock->info.rows) {
break; fetchNextBlockIfCompleted(pOperator, newgroup);
if (pOperator->status == OP_EXEC_DONE) {
return pJoinInfo->pRes;
}
} }
} else if (ts[st->index] > ts0[st0->index]) { // greater than the first; } else if (ts[st->index] > ts0[st0->index]) { // greater than the first;
if (prefixEqual == true) { if (prefixEqual == true) {
...@@ -856,12 +870,19 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) { ...@@ -856,12 +870,19 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
for (int32_t j = 0; j < i; ++j) { for (int32_t j = 0; j < i; ++j) {
SJoinStatus* stx = &pJoinInfo->status[j]; SJoinStatus* stx = &pJoinInfo->status[j];
if ((++(stx->index)) >= stx->pBlock->info.rows) { if ((++(stx->index)) >= stx->pBlock->info.rows) {
break;
fetchNextBlockIfCompleted(pOperator, newgroup);
if (pOperator->status == OP_EXEC_DONE) {
return pJoinInfo->pRes;
}
} }
} }
} else { } else {
if ((++(st0->index)) >= st0->pBlock->info.rows) { if ((++(st0->index)) >= st0->pBlock->info.rows) {
break; fetchNextBlockIfCompleted(pOperator, newgroup);
if (pOperator->status == OP_EXEC_DONE) {
return pJoinInfo->pRes;
}
} }
} }
} }
...@@ -1132,6 +1153,19 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue ...@@ -1132,6 +1153,19 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
memcpy(schema, pSchema, numOfCol1*sizeof(SSchema)); memcpy(schema, pSchema, numOfCol1*sizeof(SSchema));
} }
// update the exprinfo
int32_t numOfOutput = (int32_t)tscNumOfExprs(px);
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pex = taosArrayGetP(px->exprList, i);
int32_t colId = pex->base.colInfo.colId;
for(int32_t j = 0; j < pSourceOperator->numOfOutput; ++j) {
if (colId == schema[j].colId) {
pex->base.colInfo.colIndex = j;
break;
}
}
}
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN); px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
tfree(pColumnInfo); tfree(pColumnInfo);
tfree(schema); tfree(schema);
......
...@@ -16,13 +16,13 @@ ...@@ -16,13 +16,13 @@
*/ */
package com.taosdata.jdbc; package com.taosdata.jdbc;
import com.taosdata.jdbc.utils.TaosInfo;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLWarning; import java.sql.SQLWarning;
import java.util.List; import java.util.List;
import com.taosdata.jdbc.utils.TaosInfo;
/** /**
* JNI connector * JNI connector
*/ */
...@@ -276,23 +276,14 @@ public class TSDBJNIConnector { ...@@ -276,23 +276,14 @@ public class TSDBJNIConnector {
private native int validateCreateTableSqlImp(long connection, byte[] sqlBytes); private native int validateCreateTableSqlImp(long connection, byte[] sqlBytes);
public long prepareStmt(String sql) throws SQLException { public long prepareStmt(String sql) throws SQLException {
Long stmt = 0L; Long stmt = prepareStmtImp(sql.getBytes(), this.taos);
try { if (stmt == TSDBConstants.JNI_TDENGINE_ERROR) {
stmt = prepareStmtImp(sql.getBytes(), this.taos); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_SQL);
} catch (Exception e) { } else if (stmt == TSDBConstants.JNI_CONNECTION_NULL) {
e.printStackTrace();
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_ENCODING);
}
if (stmt == TSDBConstants.JNI_CONNECTION_NULL) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
} } else if (stmt == TSDBConstants.JNI_SQL_NULL) {
if (stmt == TSDBConstants.JNI_SQL_NULL) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_SQL_NULL); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_SQL_NULL);
} } else if (stmt == TSDBConstants.JNI_OUT_OF_MEMORY) {
if (stmt == TSDBConstants.JNI_OUT_OF_MEMORY) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_OUT_OF_MEMORY); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_OUT_OF_MEMORY);
} }
......
...@@ -2490,7 +2490,6 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) { ...@@ -2490,7 +2490,6 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) {
tmp += POINTER_BYTES * pCtx->param[0].i64; tmp += POINTER_BYTES * pCtx->param[0].i64;
size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
// assert(pCtx->param[0].i64 > 0);
for (int32_t i = 0; i < pCtx->param[0].i64; ++i) { for (int32_t i = 0; i < pCtx->param[0].i64; ++i) {
pTopBotInfo->res[i] = (tValuePair*) tmp; pTopBotInfo->res[i] = (tValuePair*) tmp;
...@@ -2499,7 +2498,6 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) { ...@@ -2499,7 +2498,6 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) {
} }
} }
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval) { bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo == NULL) { if (pResInfo == NULL) {
...@@ -2579,13 +2577,14 @@ static void top_function(SQLFunctionCtx *pCtx) { ...@@ -2579,13 +2577,14 @@ static void top_function(SQLFunctionCtx *pCtx) {
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i); char *data = GET_INPUT_DATA(pCtx, i);
TSKEY ts = GET_TS_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) { if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue; continue;
} }
notNullElems++; notNullElems++;
// NOTE: Set the default timestamp if it is missing [todo refactor]
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
do_top_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); do_top_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0);
} }
...@@ -2658,13 +2657,13 @@ static void bottom_function(SQLFunctionCtx *pCtx) { ...@@ -2658,13 +2657,13 @@ static void bottom_function(SQLFunctionCtx *pCtx) {
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i); char *data = GET_INPUT_DATA(pCtx, i);
TSKEY ts = GET_TS_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) { if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue; continue;
} }
notNullElems++; notNullElems++;
// NOTE: Set the default timestamp if it is missing [todo refactor]
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0);
} }
...@@ -2742,7 +2741,7 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2742,7 +2741,7 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
__compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn; __compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn;
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
} else if (pCtx->param[1].i64 > PRIMARYKEY_TIMESTAMP_COL_INDEX) { } else /*if (pCtx->param[1].i64 > PRIMARYKEY_TIMESTAMP_COL_INDEX)*/ {
__compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn; __compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn;
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
} }
......
...@@ -950,7 +950,13 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, ...@@ -950,7 +950,13 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
uint32_t status = aAggs[pCtx[i].functionId].status; uint32_t status = aAggs[pCtx[i].functionId].status;
if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) { if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) {
SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
// In case of the top/bottom query again the nest query result, which has no timestamp column
// don't set the ptsList attribute.
if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
pCtx[i].ptsList = (int64_t*) tsInfo->pData; pCtx[i].ptsList = (int64_t*) tsInfo->pData;
} else {
pCtx[i].ptsList = NULL;
}
} }
} else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
...@@ -4228,6 +4234,10 @@ static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SSDataBlock* pBl ...@@ -4228,6 +4234,10 @@ static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SSDataBlock* pBl
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
pTableQueryInfo->lastKey = ((order == TSDB_ORDER_ASC)? pBlock->info.window.ekey:pBlock->info.window.skey) + step; pTableQueryInfo->lastKey = ((order == TSDB_ORDER_ASC)? pBlock->info.window.ekey:pBlock->info.window.skey) + step;
if (pTableQueryInfo->pTable == NULL) {
return;
}
STableIdInfo tidInfo = createTableIdInfo(pTableQueryInfo); STableIdInfo tidInfo = createTableIdInfo(pTableQueryInfo);
STableIdInfo *idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid)); STableIdInfo *idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid));
if (idinfo != NULL) { if (idinfo != NULL) {
...@@ -4905,8 +4915,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { ...@@ -4905,8 +4915,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows); updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows);
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) {
if (pTableQueryInfo != NULL) { // TODO refactor
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order); updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
} }
...@@ -4949,8 +4958,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { ...@@ -4949,8 +4958,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows); updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows);
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) {
if (pTableQueryInfo != NULL) { // TODO refactor
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order); updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
} }
......
...@@ -24,6 +24,9 @@ sql drop database if exists $db ...@@ -24,6 +24,9 @@ sql drop database if exists $db
sql create database $db keep 36500 sql create database $db keep 36500
sql use $db sql use $db
print =====================================> td-4481
sql create database $db
print =====================================> test case for twa in single block print =====================================> test case for twa in single block
sql create table t1 (ts timestamp, k float); sql create table t1 (ts timestamp, k float);
......
...@@ -9,7 +9,7 @@ sql connect ...@@ -9,7 +9,7 @@ sql connect
print ======================== dnode1 start print ======================== dnode1 start
$dbPrefix = nest_query $dbPrefix = nest_db
$tbPrefix = nest_tb $tbPrefix = nest_tb
$mtPrefix = nest_mt $mtPrefix = nest_mt
$tbNum = 10 $tbNum = 10
...@@ -17,7 +17,6 @@ $rowNum = 10000 ...@@ -17,7 +17,6 @@ $rowNum = 10000
$totalNum = $tbNum * $rowNum $totalNum = $tbNum * $rowNum
print =============== nestquery.sim print =============== nestquery.sim
$i = 0 $i = 0
$db = $dbPrefix . $i $db = $dbPrefix . $i
$mt = $mtPrefix . $i $mt = $mtPrefix . $i
......
...@@ -60,4 +60,5 @@ run general/parser/slimit_alter_tags.sim ...@@ -60,4 +60,5 @@ run general/parser/slimit_alter_tags.sim
run general/parser/binary_escapeCharacter.sim run general/parser/binary_escapeCharacter.sim
run general/parser/between_and.sim run general/parser/between_and.sim
run general/parser/last_cache.sim run general/parser/last_cache.sim
run general/parser/nestquery.sim
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册