未验证 提交 b364ae65 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #6519 from taosdata/feature/query

[td-4739]<fix>: fix bug in derivative.
...@@ -367,6 +367,7 @@ static bool bnMonitorBalance() { ...@@ -367,6 +367,7 @@ static bool bnMonitorBalance() {
for (int32_t dest = 0; dest < src; dest++) { for (int32_t dest = 0; dest < src; dest++) {
SDnodeObj *pDestDnode = tsBnDnodes.list[dest]; SDnodeObj *pDestDnode = tsBnDnodes.list[dest];
if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) continue; if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) continue;
if (taosGetTimestampMs() - pDestDnode->createdTime < 2000) continue;
float destScore = bnTryCalcDnodeScore(pDestDnode, 1); float destScore = bnTryCalcDnodeScore(pDestDnode, 1);
if (srcScore + 0.0001 < destScore) continue; if (srcScore + 0.0001 < destScore) continue;
......
...@@ -123,6 +123,8 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i ...@@ -123,6 +123,8 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
*/ */
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsIrateQuery(SQueryInfo* pQueryInfo);
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo); bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo); bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
......
...@@ -423,18 +423,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -423,18 +423,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if(code != TSDB_CODE_SUCCESS) { if(code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
if (pInfo->pMiscInfo->tableType == TSDB_SUPER_TABLE) {
//// code = tscGetTableMeta(pSql, pTableMetaInfo);
//// if (code != TSDB_CODE_SUCCESS) {
//// return code;
//// }
//
// if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
// }
}
} else if (pInfo->type == TSDB_SQL_DROP_DNODE) { } else if (pInfo->type == TSDB_SQL_DROP_DNODE) {
if (pzName->type == TK_STRING) { if (pzName->type == TK_STRING) {
pzName->n = strdequote(pzName->z); pzName->n = strdequote(pzName->z);
...@@ -772,8 +760,9 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -772,8 +760,9 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pCmd->active = pCmd->pQueryInfo; pCmd->active = pCmd->pQueryInfo;
pCmd->command = pCmd->pQueryInfo->command; pCmd->command = pCmd->pQueryInfo->command;
if (pTableMetaInfo->pTableMeta != NULL) { STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pCmd->active, 0);
pSql->res.precision = tscGetTableInfo(pTableMetaInfo->pTableMeta).precision; if (pTableMetaInfo1->pTableMeta != NULL) {
pSql->res.precision = tscGetTableInfo(pTableMetaInfo1->pTableMeta).precision;
} }
return TSDB_CODE_SUCCESS; // do not build query message here return TSDB_CODE_SUCCESS; // do not build query message here
...@@ -2026,8 +2015,10 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t ...@@ -2026,8 +2015,10 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
SSchema colSchema = *tGetTbnameColumnSchema(); SSchema colSchema = *tGetTbnameColumnSchema();
getColumnName(pItem, colSchema.name, colSchema.name, sizeof(colSchema.name) - 1); char name[TSDB_COL_NAME_LEN] = {0};
getColumnName(pItem, name, colSchema.name, sizeof(colSchema.name) - 1);
tstrncpy(colSchema.name, name, TSDB_COL_NAME_LEN);
/*SExprInfo* pExpr = */tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG, getNewResColId(pCmd)); /*SExprInfo* pExpr = */tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG, getNewResColId(pCmd));
} else { } else {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
...@@ -3066,8 +3057,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) { ...@@ -3066,8 +3057,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) {
} }
bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "TWA/Diff not allowed to apply to super table directly"; const char* msg1 = "TWA/Diff/Derivative/Irate not allowed to apply to super table directly";
const char* msg2 = "TWA/Diff only support group by tbname for super table query"; const char* msg2 = "TWA/Diff/Derivative/Irate only support group by tbname for super table query";
const char* msg3 = "function not support for super table query"; const char* msg3 = "function not support for super table query";
// filter sql function not supported by metric query yet. // filter sql function not supported by metric query yet.
...@@ -3080,7 +3071,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) ...@@ -3080,7 +3071,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
} }
} }
if (tscIsTWAQuery(pQueryInfo) || tscIsDiffDerivQuery(pQueryInfo)) { if (tscIsTWAQuery(pQueryInfo) || tscIsDiffDerivQuery(pQueryInfo) || tscIsIrateQuery(pQueryInfo)) {
if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) { if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return true; return true;
...@@ -7791,7 +7782,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7791,7 +7782,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
} }
} }
// todo derivate funtion requires ts column exists in subquery // todo derivative function requires ts column exists in subquery
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, 0); SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, 0);
......
...@@ -283,6 +283,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { ...@@ -283,6 +283,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
SArray* tables = getTableList(pSql); SArray* tables = getTableList(pSql);
if (tables == NULL) { if (tables == NULL) {
pSub->lastSyncTime = 0; //force to get table list next time
return 0; return 0;
} }
size_t numOfTables = taosArrayGetSize(tables); size_t numOfTables = taosArrayGetSize(tables);
...@@ -489,7 +490,15 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -489,7 +490,15 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSub *pSub = (SSub *)tsub; SSub *pSub = (SSub *)tsub;
if (pSub == NULL) return NULL; if (pSub == NULL) return NULL;
if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { if (pSub->pTimer == NULL) {
int64_t duration = taosGetTimestampMs() - pSub->lastConsumeTime;
if (duration < (int64_t)(pSub->interval)) {
tscDebug("subscription consume too frequently, blocking...");
taosMsleep(pSub->interval - (int32_t)duration);
}
}
if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { //may reach here when retireve stable vgroup failed
SSqlObj* pSql = recreateSqlObj(pSub); SSqlObj* pSql = recreateSqlObj(pSub);
if (pSql == NULL) { if (pSql == NULL) {
return NULL; return NULL;
...@@ -503,6 +512,11 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -503,6 +512,11 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSub->pSql = pSql; pSub->pSql = pSql;
pSql->pSubscription = pSub; pSql->pSubscription = pSub;
// no table list now, force to update it
tscDebug("begin table synchronization");
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
tscDebug("table synchronization completed");
} }
tscSaveSubscriptionProgress(pSub); tscSaveSubscriptionProgress(pSub);
...@@ -527,14 +541,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -527,14 +541,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
tscDebug("subscribe:%s set next round subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey); tscDebug("subscribe:%s set next round subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
} }
if (pSub->pTimer == NULL) {
int64_t duration = taosGetTimestampMs() - pSub->lastConsumeTime;
if (duration < (int64_t)(pSub->interval)) {
tscDebug("subscription consume too frequently, blocking...");
taosMsleep(pSub->interval - (int32_t)duration);
}
}
size_t size = taosArrayGetSize(pSub->progress) * sizeof(STableIdInfo); size_t size = taosArrayGetSize(pSub->progress) * sizeof(STableIdInfo);
size += sizeof(SQueryTableMsg) + 4096; size += sizeof(SQueryTableMsg) + 4096;
int code = tscAllocPayload(&pSql->cmd, (int)size); int code = tscAllocPayload(&pSql->cmd, (int)size);
......
...@@ -460,6 +460,22 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) { ...@@ -460,6 +460,22 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
return false; return false;
} }
bool tscIsIrateQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (pExpr == NULL) {
continue;
}
if (pExpr->base.functionId == TSDB_FUNC_IRATE) {
return true;
}
}
return false;
}
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) { bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) {
return pQueryInfo->sessionWindow.gap > 0; return pQueryInfo->sessionWindow.gap > 0;
} }
...@@ -3477,6 +3493,7 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) { ...@@ -3477,6 +3493,7 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
if (pSql->res.code == TSDB_CODE_SUCCESS) { if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); (*pSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
} else { } else {
pParentSql->res.code = pSql->res.code;
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
} }
} }
......
...@@ -27,6 +27,7 @@ void dnodeUpdateCfg(SDnodeCfg *cfg); ...@@ -27,6 +27,7 @@ void dnodeUpdateCfg(SDnodeCfg *cfg);
int32_t dnodeGetDnodeId(); int32_t dnodeGetDnodeId();
void dnodeGetClusterId(char *clusterId); void dnodeGetClusterId(char *clusterId);
void dnodeGetCfg(int32_t *dnodeId, char *clusterId); void dnodeGetCfg(int32_t *dnodeId, char *clusterId);
void dnodeSetDropped();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
static SDnodeCfg tsCfg = {0}; static SDnodeCfg tsCfg = {0};
static pthread_mutex_t tsCfgMutex; static pthread_mutex_t tsCfgMutex;
static int32_t tsDnodeDropped;
static int32_t dnodeReadCfg(); static int32_t dnodeReadCfg();
static int32_t dnodeWriteCfg(); static int32_t dnodeWriteCfg();
...@@ -34,6 +35,10 @@ int32_t dnodeInitCfg() { ...@@ -34,6 +35,10 @@ int32_t dnodeInitCfg() {
if (ret == 0) { if (ret == 0) {
dInfo("dnode cfg is initialized"); dInfo("dnode cfg is initialized");
} }
if (tsDnodeDropped) {
dInfo("dnode is dropped, exiting");
return -1;
}
return ret; return ret;
} }
...@@ -44,6 +49,14 @@ void dnodeUpdateCfg(SDnodeCfg *cfg) { ...@@ -44,6 +49,14 @@ void dnodeUpdateCfg(SDnodeCfg *cfg) {
dnodeResetCfg(cfg); dnodeResetCfg(cfg);
} }
void dnodeSetDropped() {
pthread_mutex_lock(&tsCfgMutex);
tsDnodeDropped = 1;
dnodeWriteCfg();
pthread_mutex_unlock(&tsCfgMutex);
}
int32_t dnodeGetDnodeId() { int32_t dnodeGetDnodeId() {
int32_t dnodeId = 0; int32_t dnodeId = 0;
pthread_mutex_lock(&tsCfgMutex); pthread_mutex_lock(&tsCfgMutex);
...@@ -119,6 +132,14 @@ static int32_t dnodeReadCfg() { ...@@ -119,6 +132,14 @@ static int32_t dnodeReadCfg() {
} }
cfg.dnodeId = (int32_t)dnodeId->valueint; cfg.dnodeId = (int32_t)dnodeId->valueint;
cJSON *dnodeDropped = cJSON_GetObjectItem(root, "dnodeDropped");
if (!dnodeDropped || dnodeDropped->type != cJSON_Number) {
dError("failed to read %s, dnodeDropped not found", file);
//goto PARSE_CFG_OVER;
} else {
tsDnodeDropped = (int32_t)dnodeDropped->valueint;
}
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) { if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s, clusterId not found", file); dError("failed to read %s, clusterId not found", file);
...@@ -154,6 +175,7 @@ static int32_t dnodeWriteCfg() { ...@@ -154,6 +175,7 @@ static int32_t dnodeWriteCfg() {
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsCfg.dnodeId); len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsCfg.dnodeId);
len += snprintf(content + len, maxLen - len, " \"dnodeDropped\": %d,\n", tsDnodeDropped);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsCfg.clusterId); len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsCfg.clusterId);
len += snprintf(content + len, maxLen - len, "}\n"); len += snprintf(content + len, maxLen - len, "}\n");
......
...@@ -202,6 +202,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -202,6 +202,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
char clusterId[TSDB_CLUSTER_ID_LEN]; char clusterId[TSDB_CLUSTER_ID_LEN];
dnodeGetClusterId(clusterId); dnodeGetClusterId(clusterId);
if (clusterId[0] != '\0') { if (clusterId[0] != '\0') {
dnodeSetDropped();
dError("exit zombie dropped dnode"); dError("exit zombie dropped dnode");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
......
...@@ -138,7 +138,6 @@ typedef struct SQueryInfo { ...@@ -138,7 +138,6 @@ typedef struct SQueryInfo {
bool hasFilter; bool hasFilter;
bool onlyTagQuery; bool onlyTagQuery;
bool orderProjectQuery; bool orderProjectQuery;
// bool diffQuery;
bool stateWindow; bool stateWindow;
} SQueryInfo; } SQueryInfo;
......
...@@ -3613,16 +3613,7 @@ static void deriv_function(SQLFunctionCtx *pCtx) { ...@@ -3613,16 +3613,7 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
qError("error input type"); qError("error input type");
} }
// initial value is not set yet, all data block are null
if (!pDerivInfo->valueSet || notNullElems <= 0) {
/*
* 1. current block and blocks before are full of null
* 2. current block may be null value
*/
assert(pCtx->hasNull);
} else {
GET_RES_INFO(pCtx)->numOfRes += notNullElems; GET_RES_INFO(pCtx)->numOfRes += notNullElems;
}
} }
#define DIFF_IMPL(ctx, d, type) \ #define DIFF_IMPL(ctx, d, type) \
......
...@@ -1312,6 +1312,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1312,6 +1312,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
} }
} }
// todo opt perf
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
pInfo->binfo.pCtx[k].size = 1; pInfo->binfo.pCtx[k].size = 1;
int32_t functionId = pInfo->binfo.pCtx[k].functionId; int32_t functionId = pInfo->binfo.pCtx[k].functionId;
......
...@@ -817,6 +817,9 @@ print ====================> TODO stddev + normal column filter ...@@ -817,6 +817,9 @@ print ====================> TODO stddev + normal column filter
print ====================> irate print ====================> irate
sql_error select irate(f1) from st1;
sql select irate(f1) from st1 group by tbname;
sql select irate(k) from t1 sql select irate(k) from t1
if $rows != 1 then if $rows != 1 then
return -1 return -1
...@@ -1073,6 +1076,12 @@ sql insert into t0 values('2020-1-1 1:3:9', 9); ...@@ -1073,6 +1076,12 @@ sql insert into t0 values('2020-1-1 1:3:9', 9);
sql insert into t0 values('2020-1-1 1:4:10', 10); sql insert into t0 values('2020-1-1 1:4:10', 10);
sql insert into t1 values('2020-1-1 1:1:2', 2); sql insert into t1 values('2020-1-1 1:1:2', 2);
print ===========================>td-4739
sql select diff(val) from (select derivative(k, 1s, 0) val from t1);
if $rows != 0 then
return -1
endi
sql insert into t1 values('2020-1-1 1:1:4', 20); sql insert into t1 values('2020-1-1 1:1:4', 20);
sql insert into t1 values('2020-1-1 1:1:6', 200); sql insert into t1 values('2020-1-1 1:1:6', 200);
sql insert into t1 values('2020-1-1 1:1:8', 2000); sql insert into t1 values('2020-1-1 1:1:8', 2000);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册