提交 89abdd98 编写于 作者: H Haojun Liao

[td-4555]<feature>: add derivative function

上级 0e223ea2
......@@ -214,7 +214,7 @@ void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId);
void tscDequoteAndTrimToken(SStrToken* pToken);
int32_t tscValidateName(SStrToken* pToken);
......
......@@ -2157,11 +2157,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_MIN:
case TSDB_FUNC_MAX:
case TSDB_FUNC_DIFF:
case TSDB_FUNC_DERIVATIVE:
case TSDB_FUNC_STDDEV:
case TSDB_FUNC_LEASTSQR: {
// 1. valid the number of parameters
if (pItem->pNode->pParam == NULL || (functionId != TSDB_FUNC_LEASTSQR && taosArrayGetSize(pItem->pNode->pParam) != 1) ||
(functionId == TSDB_FUNC_LEASTSQR && taosArrayGetSize(pItem->pNode->pParam) != 3)) {
int32_t numOfParams = taosArrayGetSize(pItem->pNode->pParam);
if (pItem->pNode->pParam == NULL ||
(functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && numOfParams != 1) ||
((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3)) {
/* no parameters or more than one parameter for function */
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -2182,11 +2185,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// 2. check if sql function can be applied on this column data type
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
if (!IS_NUMERIC_TYPE(pSchema->type)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && functionId == TSDB_FUNC_DIFF) {
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
}
......@@ -2200,11 +2205,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
// set the first column ts for diff query
if (functionId == TSDB_FUNC_DIFF) {
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
colIndex += 1;
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE,
getNewResColId(pCmd), TSDB_KEYSIZE, false);
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
TSDB_KEYSIZE, getNewResColId(pCmd), TSDB_KEYSIZE, false);
SColumnList ids = createColumnList(1, 0, 0);
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
......@@ -2230,12 +2235,29 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_TSC_INVALID_OPERATION;
}
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, DOUBLE_BYTES);
} else if (functionId == TSDB_FUNC_IRATE) {
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int64_t prec = info.precision;
tscExprAddParams(&pExpr->base, (char*)&prec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
} else if (functionId == TSDB_FUNC_DERIVATIVE) {
char val[8] = {0};
int64_t tickPerSec = 0;
if (tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (info.precision == TSDB_TIME_PRECISION_MILLI) {
tickPerSec /= 1000;
}
tscExprAddParams(&pExpr->base, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
memset(val, 0, tListLen(val));
if (tVariantDump(&pParamElem[2].pNode->value, val, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
}
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
......
......@@ -1621,7 +1621,7 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
uint64_t localQueryId = pSql->self;
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
convertQueryResult(pRes, pQueryInfo);
convertQueryResult(pRes, pQueryInfo, pSql->self);
code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) {
......
......@@ -1977,9 +1977,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
}
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self);
tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
tscDebug("0x%"PRIx64" start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
......
......@@ -491,7 +491,7 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
return false;
}
if (/*tscGroupbyColumn(pQueryInfo) || */isTsCompQuery(pQueryInfo) || tscIsTopBotQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) {
if (tscIsDiffQuery(pQueryInfo)) {
return false;
}
......@@ -507,13 +507,13 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
continue;
}
if (!IS_MULTIOUTPUT(aAggs[functionId].status)) {
if ((!IS_MULTIOUTPUT(aAggs[functionId].status)) ||
(functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TS_COMP)) {
return true;
}
}
return false;
}
bool isBlockDistQuery(SQueryInfo* pQueryInfo) {
......@@ -1046,7 +1046,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUp
return pOperator;
}
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId) {
// set the correct result
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
pRes->numOfRows = (p != NULL)? p->info.rows: 0;
......@@ -1056,6 +1056,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
tscSetResRawPtrRv(pRes, pQueryInfo, p);
}
tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows);
pRes->row = 0;
pRes->completed = (pRes->numOfRows == 0);
}
......@@ -1172,7 +1173,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
uint64_t qId = pSql->self;
qTableQuery(px->pQInfo, &qId);
convertQueryResult(pOutput, px);
convertQueryResult(pOutput, px, pSql->self);
}
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
......@@ -2171,6 +2172,7 @@ size_t tscNumOfExprs(SQueryInfo* pQueryInfo) {
return taosArrayGetSize(pQueryInfo->exprList);
}
// todo REFACTOR
void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes) {
assert (pExpr != NULL || argument != NULL || bytes != 0);
......
......@@ -66,17 +66,19 @@ extern "C" {
#define TSDB_FUNC_RATE 29
#define TSDB_FUNC_IRATE 30
#define TSDB_FUNC_TID_TAG 31
#define TSDB_FUNC_BLKINFO 32
#define TSDB_FUNC_HISTOGRAM 33
#define TSDB_FUNC_HLL 34
#define TSDB_FUNC_MODE 35
#define TSDB_FUNC_SAMPLE 36
#define TSDB_FUNC_CEIL 37
#define TSDB_FUNC_FLOOR 38
#define TSDB_FUNC_ROUND 39
#define TSDB_FUNC_MAVG 40
#define TSDB_FUNC_CSUM 41
#define TSDB_FUNC_DERIVATIVE 32
#define TSDB_FUNC_BLKINFO 33
#define TSDB_FUNC_HISTOGRAM 34
#define TSDB_FUNC_HLL 35
#define TSDB_FUNC_MODE 36
#define TSDB_FUNC_SAMPLE 37
#define TSDB_FUNC_CEIL 38
#define TSDB_FUNC_FLOOR 39
#define TSDB_FUNC_ROUND 40
#define TSDB_FUNC_MAVG 41
#define TSDB_FUNC_CSUM 42
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......
......@@ -246,6 +246,7 @@ typedef struct SQueryRuntimeEnv {
void* pQueryHandle;
int32_t prevGroupId; // previous executed group id
bool enableGroupData;
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
......
......@@ -161,6 +161,14 @@ typedef struct SRateInfo {
bool isIRate; // true for IRate functions, false for Rate functions
} SRateInfo;
typedef struct SDerivInfo {
double prevValue; // previous value
TSKEY prevTs; // previous timestamp
bool ignoreNegative;// ignore the negative value
int64_t tsWindow; // time window for derivative
bool valueSet; // the value has been set already
} SDerivInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable) {
if (!isValidDataType(dataType)) {
......@@ -189,7 +197,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE);
*interBytes = 0;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_BLKINFO) {
}
if (functionId == TSDB_FUNC_BLKINFO) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = 16384;
*interBytes = 0;
......@@ -217,6 +227,14 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_DERIVATIVE) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double); // this results is compressed ts data, only one byte
*interBytes = sizeof(SDerivInfo);
return TSDB_CODE_SUCCESS;
}
//TODO twa function definit error.
if (isSuperTable) {
if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
*type = TSDB_DATA_TYPE_BINARY;
......@@ -3402,6 +3420,256 @@ static bool diff_function_setup(SQLFunctionCtx *pCtx) {
return false;
}
#define DIFF_IMPL(ctx, d, type) \
do { \
if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \
(ctx)->param[1].nType = (ctx)->inputType; \
*(type *)&(ctx)->param[1].i64 = *(type *)(d); \
} else { \
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \
*(type *)(&(ctx)->param[1].i64) = *(type *)(d); \
*(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \
} \
} while (0);
static bool deriv_function_setup(SQLFunctionCtx *pCtx) {
if (!function_setup(pCtx)) {
return false;
}
// diff function require the value is set to -1
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDerivInfo->ignoreNegative = pCtx->param[2].i64;
pDerivInfo->prevTs = -1;
pDerivInfo->tsWindow = pCtx->param[0].i64;
pDerivInfo->valueSet = false;
return false;
}
static void deriv_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo);
void *data = GET_INPUT_DATA_LIST(pCtx);
bool isFirstBlock = (pDerivInfo->valueSet == false);
int32_t notNullElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
TSKEY *pTimestamp = pCtx->ptsOutputBuf;
TSKEY *tsList = GET_TS_LIST(pCtx);
double *pOutput = (double *)pCtx->pOutput;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_INT: {
int32_t *pData = (int32_t *)data;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
continue;
}
if (!pDerivInfo->valueSet) { // initial value is not set yet
pDerivInfo->valueSet = true;
} else {
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else {
*pTimestamp = tsList[i];
pOutput += 1;
pTimestamp += 1;
}
}
pDerivInfo->prevValue = pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
};
case TSDB_DATA_TYPE_BIGINT: {
int64_t *pData = (int64_t *)data;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
continue;
}
if (!pDerivInfo->valueSet) { // initial value is not set yet
pDerivInfo->valueSet = true;
} else {
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else {
*pTimestamp = tsList[i];
pOutput += 1;
pTimestamp += 1;
}
}
pDerivInfo->prevValue = pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double *pData = (double *)data;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
continue;
}
if (!pDerivInfo->valueSet) { // initial value is not set yet
pDerivInfo->valueSet = true;
} else {
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else {
*pTimestamp = tsList[i];
pOutput += 1;
pTimestamp += 1;
}
}
pDerivInfo->prevValue = pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *pData = (float *)data;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
continue;
}
if (!pDerivInfo->valueSet) { // initial value is not set yet
pDerivInfo->valueSet = true;
} else {
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else {
*pTimestamp = tsList[i];
pOutput += 1;
pTimestamp += 1;
}
}
pDerivInfo->prevValue = pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *pData = (int16_t *)data;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
continue;
}
if (!pDerivInfo->valueSet) { // initial value is not set yet
pDerivInfo->valueSet = true;
} else {
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else {
*pTimestamp = tsList[i];
pOutput += 1;
pTimestamp += 1;
}
}
pDerivInfo->prevValue = pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
}
case TSDB_DATA_TYPE_TINYINT: {
int8_t *pData = (int8_t *)data;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) {
continue;
}
if (!pDerivInfo->valueSet) { // initial value is not set yet
pDerivInfo->valueSet = true;
} else {
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else {
*pTimestamp = tsList[i];
pOutput += 1;
pTimestamp += 1;
}
}
pDerivInfo->prevValue = pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
}
default:
qError("error input type");
}
// initial value is not set yet, all data block are null
if (!pDerivInfo->valueSet || notNullElems <= 0) {
/*
* 1. current block and blocks before are full of null
* 2. current block may be null value
*/
assert(pCtx->hasNull);
} else {
int32_t forwardStep = (isFirstBlock) ? notNullElems - 1 : notNullElems;
GET_RES_INFO(pCtx)->numOfRes += forwardStep;
}
}
#define DIFF_IMPL(ctx, d, type) \
do { \
if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \
(ctx)->param[1].nType = (ctx)->inputType; \
*(type *)&(ctx)->param[1].i64 = *(type *)(d); \
} else { \
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \
*(type *)(&(ctx)->param[1].i64) = *(type *)(d); \
*(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \
} \
} while (0);
#define DIFF_IMPL(ctx, d, type) \
do { \
if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \
(ctx)->param[1].nType = (ctx)->inputType; \
*(type *)&(ctx)->param[1].i64 = *(type *)(d); \
} else { \
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \
*(type *)(&(ctx)->param[1].i64) = *(type *)(d); \
*(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \
} \
} while (0);
// TODO difference in date column
static void diff_function(SQLFunctionCtx *pCtx) {
void *data = GET_INPUT_DATA_LIST(pCtx);
......@@ -5315,8 +5583,20 @@ SAggFunctionInfo aAggs[] = {{
noop1,
dataBlockRequired,
},
{ //32
"derivative", // return table id and the corresponding tags for join match and subscribe
TSDB_FUNC_DERIVATIVE,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
deriv_function_setup,
deriv_function,
noop2,
doFinalizer,
noop1,
dataBlockRequired,
},
{
// 32
// 33
"_block_dist", // return table id and the corresponding tags for join match and subscribe
TSDB_FUNC_BLKINFO,
TSDB_FUNC_BLKINFO,
......
......@@ -1681,6 +1681,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->enableGroupData = false;
pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
......@@ -3119,8 +3121,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
assert(pCtx[i].pOutput != NULL);
// set the timestamp output buffer for top/bottom/diff query
int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
int32_t fid = pCtx[i].functionId;
if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE) {
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
}
}
......@@ -4274,8 +4276,10 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
pRuntimeEnv->current = *pTableQueryInfo;
doTableQueryInfoTimeWindowCheck(pQueryAttr, *pTableQueryInfo);
if (pTableScanInfo->prevGroupId != -1 && pTableScanInfo->prevGroupId != (*pTableQueryInfo)->groupIndex) {
*newgroup = false;
if (pRuntimeEnv->enableGroupData) {
if(pTableScanInfo->prevGroupId != -1 && pTableScanInfo->prevGroupId != (*pTableQueryInfo)->groupIndex) {
*newgroup = true;
}
}
pTableScanInfo->prevGroupId = (*pTableQueryInfo)->groupIndex;
......@@ -4449,6 +4453,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
pInfo->current = 0;
pInfo->prevGroupId = -1;
pRuntimeEnv->enableGroupData = true;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableSeqScanOperator";
......
......@@ -1811,10 +1811,6 @@ if $data09 != 3 then
return -1
endi
sql select st0.*,st1.* from st0, st1 where st1.id1=st0.id1 and st0.ts=st1.ts and st1.ts=st0.ts and st0.id1=st1.id1 order by st0.ts limit 5 offset 5
if $rows != 5 then
return -1
......@@ -2294,7 +2290,6 @@ if $data19 != 9925 then
return -1
endi
sql_error select tb0_1.*, tb1_1.* from tb0_1, tb1_1 where tb0_1.f1=tb1_1.f1;
sql_error select tb0_1.*, tb1_1.* from tb0_1, tb1_1 where tb0_1.ts=tb1_1.ts and tb0_1.id1=tb1_1.id2;
sql_error select tb0_5.*, tb1_5.*,tb2_5.*,tb3_5.*,tb4_5.*,tb5_5.*, tb6_5.*,tb7_5.*,tb8_5.*,tb9_5.*,tba_5.* from tb0_5, tb1_5, tb2_5, tb3_5, tb4_5,tb5_5, tb6_5, tb7_5, tb8_5, tb9_5, tba_5 where tb9_5.ts=tb8_5.ts and tb8_5.ts=tb7_5.ts and tb7_5.ts=tb6_5.ts and tb6_5.ts=tb5_5.ts and tb5_5.ts=tb4_5.ts and tb4_5.ts=tb3_5.ts and tb3_5.ts=tb2_5.ts and tb2_5.ts=tb1_5.ts and tb1_5.ts=tb0_5.ts and tb0_5.ts=tba_5.ts;
......@@ -2317,10 +2312,4 @@ sql_error select last(*) from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 g
sql_error select st0.*,st1.*,st2.*,st3.*,st4.*,st5.*,st6.*,st7.*,st8.*,st9.* from st0,st1,st2,st3,st4,st5,st6,st7,st8,st9 where st0.ts=st2.ts and st0.ts=st4.ts and st0.ts=st6.ts and st0.ts=st8.ts and st1.ts=st3.ts and st3.ts=st5.ts and st5.ts=st7.ts and st7.ts=st9.ts and st0.id1=st2.id1 and st0.id1=st4.id1 and st0.id1=st6.id1 and st0.id1=st8.id1 and st1.id1=st3.id1 and st3.id1=st5.id1 and st5.id1=st7.id1 and st7.id1=st9.id1;
sql_error select st0.*,st1.*,st2.*,st3.*,st4.*,st5.*,st6.*,st7.*,st8.*,st9.* from st0,st1,st2,st3,st4,st5,st6,st7,st8,st9,sta where st0.ts=st2.ts and st0.ts=st4.ts and st0.ts=st6.ts and st0.ts=st8.ts and st1.ts=st3.ts and st3.ts=st5.ts and st5.ts=st7.ts and st7.ts=st9.ts and st0.ts=st1.ts and st0.id1=st2.id1 and st0.id1=st4.id1 and st0.id1=st6.id1 and st0.id1=st8.id1 and st1.id1=st3.id1 and st3.id1=st5.id1 and st5.id1=st7.id1 and st7.id1=st9.id1 and st0.id1=st1.id1 and st0.id1=sta.id1 and st0.ts=sta.ts;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -39,6 +39,7 @@ run general/parser/slimit1.sim
run general/parser/slimit_alter_tags.sim
run general/parser/tbnameIn.sim
run general/parser/join.sim
#run general/parser/join_multitables.sim
run general/parser/join_multivnode.sim
run general/parser/join_manyblocks.sim
run general/parser/projection_limit_offset.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册