diff --git a/src/balance/src/bnMain.c b/src/balance/src/bnMain.c index 67741b147328cca844b796399d599a7ad92c8a7b..f022fff6d83049dbf75ce73f161983f28322304b 100644 --- a/src/balance/src/bnMain.c +++ b/src/balance/src/bnMain.c @@ -367,6 +367,7 @@ static bool bnMonitorBalance() { for (int32_t dest = 0; dest < src; dest++) { SDnodeObj *pDestDnode = tsBnDnodes.list[dest]; if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) continue; + if (taosGetTimestampMs() - pDestDnode->createdTime < 2000) continue; float destScore = bnTryCalcDnodeScore(pDestDnode, 1); if (srcScore + 0.0001 < destScore) continue; diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index a9ac788bc3065f7230e10f1c58273ef90f70c6fb..35f3b42811a1cd90dfb87cbc7695cfcd55c50fa4 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -123,6 +123,8 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i */ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo); +bool tscIsIrateQuery(SQueryInfo* pQueryInfo); + bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo); bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 89d915a813f67915bf4e2210a9eacb8515c54f54..bac8920d8f1d17cb53756378724a56b37dead866 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -906,7 +906,7 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { int code = doBindParam(pBlock, data, param, &bind[param->idx], 1); if (code != TSDB_CODE_SUCCESS) { - tscDebug("0x%"PRIx64" bind column %d: type mismatch or invalid", pStmt->pSql->self, param->idx); + tscDebug("0x%"PRIx64" bind column %d: type mismatch or invalid", pStmt->pSql->self, param->idx); return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "bind column type mismatch or invalid"); } } @@ -1256,7 +1256,7 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } - if (sToken.n <= 0 || sToken.type != TK_USING) { + if (sToken.n <= 0 || sToken.type != TK_USING) { tscError("keywords USING is expected, sql:%s", pCmd->insertParam.sql); return tscSQLSyntaxErrMsg(pCmd->payload, "keywords USING is expected", sToken.z ? sToken.z : pCmd->insertParam.sql); } @@ -1403,7 +1403,7 @@ int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAO } else { tfree(pSql->sqlstr); } - + pSql->sqlstr = str; return TSDB_CODE_SUCCESS; @@ -1415,7 +1415,7 @@ int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAO TAOS_STMT* taos_stmt_init(TAOS* taos) { STscObj* pObj = (STscObj*)taos; STscStmt* pStmt = NULL; - + if (pObj == NULL || pObj->signature != pObj) { terrno = TSDB_CODE_TSC_DISCONNECTED; tscError("connection disconnected"); @@ -1469,7 +1469,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { tscError("sql is NULL"); STMT_RET(invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "sql is NULL")); } - + if (pStmt->last != STMT_INIT) { tscError("prepare status error, last:%d", pStmt->last); STMT_RET(invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "prepare status error")); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index d0bd767a4625f57695e819b7ac9c46557ab72c4f..0ab7c38186f1c5b0d9260eea2c1a3fc18d2e65de 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -147,57 +147,57 @@ int16_t getNewResColId(SSqlCmd* pCmd) { return pCmd->resColumnId--; } -// serialize expr in exprlist to binary +// serialize expr in exprlist to binary // formate "type | size | value" bool serializeExprListToVariant(SArray* pList, tVariant **dst, int16_t colType) { bool ret = false; if (!pList || pList->size <= 0 || colType < 0) { return ret; - } - - tSqlExprItem* item = (tSqlExprItem *)taosArrayGet(pList, 0); - int32_t firstTokenType = item->pNode->token.type; + } + + tSqlExprItem* item = (tSqlExprItem *)taosArrayGet(pList, 0); + int32_t firstTokenType = item->pNode->token.type; int32_t type = firstTokenType; - //nchar to binary and - toTSDBType(type); + //nchar to binary and + toTSDBType(type); if (type != colType && (type != TSDB_DATA_TYPE_BINARY || colType != TSDB_DATA_TYPE_NCHAR)) { - return false; - } - type = colType; - + return false; + } + type = colType; + SBufferWriter bw = tbufInitWriter( NULL, false ); tbufEnsureCapacity(&bw, 512); int32_t size = (int32_t)(pList->size); - tbufWriteUint32(&bw, type); + tbufWriteUint32(&bw, type); tbufWriteInt32(&bw, size); - + for (int32_t i = 0; i < size; i++) { tSqlExpr* pSub = ((tSqlExprItem*)(taosArrayGet(pList, i)))->pNode; // check all the token type in expr list same or not if (firstTokenType != pSub->token.type) { break; - } + } - toTSDBType(pSub->token.type); + toTSDBType(pSub->token.type); - tVariant var; - tVariantCreate(&var, &pSub->token); - if (type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_SMALLINT + tVariant var; + tVariantCreate(&var, &pSub->token); + if (type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_SMALLINT || type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_INT) { - tbufWriteInt64(&bw, var.i64); + tbufWriteInt64(&bw, var.i64); } else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) { tbufWriteDouble(&bw, var.dKey); } else if (type == TSDB_DATA_TYPE_BINARY){ tbufWriteBinary(&bw, var.pz, var.nLen); } else if (type == TSDB_DATA_TYPE_NCHAR) { - char *buf = (char *)calloc(1, (var.nLen + 1)*TSDB_NCHAR_SIZE); + char *buf = (char *)calloc(1, (var.nLen + 1)*TSDB_NCHAR_SIZE); if (tVariantDump(&var, buf, type, false) != TSDB_CODE_SUCCESS) { free(buf); tVariantDestroy(&var); - break; + break; } tbufWriteBinary(&bw, buf, twcslen((wchar_t *)buf) * TSDB_NCHAR_SIZE); free(buf); @@ -209,21 +209,21 @@ bool serializeExprListToVariant(SArray* pList, tVariant **dst, int16_t colType) if (ret == true) { if ((*dst = calloc(1, sizeof(tVariant))) != NULL) { - tVariantCreateFromBinary(*dst, tbufGetData(&bw, false), tbufTell(&bw), TSDB_DATA_TYPE_BINARY); + tVariantCreateFromBinary(*dst, tbufGetData(&bw, false), tbufTell(&bw), TSDB_DATA_TYPE_BINARY); } else { ret = false; } } - tbufCloseWriter(&bw); + tbufCloseWriter(&bw); return ret; } static int32_t validateParamOfRelationIn(tVariant *pVar, int32_t colType) { if (pVar->nType != TSDB_DATA_TYPE_BINARY) { - return -1; + return -1; } - SBufferReader br = tbufInitReader(pVar->pz, pVar->nLen, false); - return colType == TSDB_DATA_TYPE_NCHAR ? 0 : (tbufReadUint32(&br) == colType ? 0: -1); + SBufferReader br = tbufInitReader(pVar->pz, pVar->nLen, false); + return colType == TSDB_DATA_TYPE_NCHAR ? 0 : (tbufReadUint32(&br) == colType ? 0: -1); } static uint8_t convertOptr(SStrToken *pToken) { @@ -423,18 +423,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { if(code != TSDB_CODE_SUCCESS) { return code; } - - if (pInfo->pMiscInfo->tableType == TSDB_SUPER_TABLE) { -//// code = tscGetTableMeta(pSql, pTableMetaInfo); -//// if (code != TSDB_CODE_SUCCESS) { -//// return code; -//// } -// -// if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { -// return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4); -// } - } - } else if (pInfo->type == TSDB_SQL_DROP_DNODE) { if (pzName->type == TK_STRING) { pzName->n = strdequote(pzName->z); @@ -507,7 +495,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { break; } - + case TSDB_SQL_CREATE_DNODE: { const char* msg = "invalid host name (ip address)"; @@ -772,8 +760,9 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { pCmd->active = pCmd->pQueryInfo; pCmd->command = pCmd->pQueryInfo->command; - if (pTableMetaInfo->pTableMeta != NULL) { - pSql->res.precision = tscGetTableInfo(pTableMetaInfo->pTableMeta).precision; + STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pCmd->active, 0); + if (pTableMetaInfo1->pTableMeta != NULL) { + pSql->res.precision = tscGetTableInfo(pTableMetaInfo1->pTableMeta).precision; } return TSDB_CODE_SUCCESS; // do not build query message here @@ -807,12 +796,12 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { } break; } - case TSDB_SQL_COMPACT_VNODE:{ + case TSDB_SQL_COMPACT_VNODE:{ const char* msg = "invalid compact"; if (setCompactVnodeInfo(pSql, pInfo) != TSDB_CODE_SUCCESS) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg); - } - break; + } + break; } default: return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression"); @@ -965,15 +954,15 @@ static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS SStrToken *col = &(pSqlNode->windowstateVal.col) ; if (col->z == NULL || col->n <= 0) { - return TSDB_CODE_SUCCESS; - } + return TSDB_CODE_SUCCESS; + } if (pQueryInfo->colList == NULL) { pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); } if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); - } + } pQueryInfo->groupbyExpr.numOfGroupCols = 1; //TODO(dengyihao): check tag column @@ -984,7 +973,7 @@ static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS SColumnIndex index = COLUMN_INDEX_INITIALIZER; if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); - } + } STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; @@ -997,7 +986,7 @@ static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS if (pGroupExpr->columnInfo == NULL) { pGroupExpr->columnInfo = taosArrayInit(4, sizeof(SColIndex)); } - + SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex); if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); @@ -1049,7 +1038,7 @@ int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pS } if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); - } + } pQueryInfo->sessionWindow.primaryColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; @@ -2026,8 +2015,10 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { SSchema colSchema = *tGetTbnameColumnSchema(); - getColumnName(pItem, colSchema.name, colSchema.name, sizeof(colSchema.name) - 1); + char name[TSDB_COL_NAME_LEN] = {0}; + getColumnName(pItem, name, colSchema.name, sizeof(colSchema.name) - 1); + tstrncpy(colSchema.name, name, TSDB_COL_NAME_LEN); /*SExprInfo* pExpr = */tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG, getNewResColId(pCmd)); } else { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); @@ -2341,7 +2332,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col tickPerSec /= 1000000; } else if (info.precision == TSDB_TIME_PRECISION_MICRO) { tickPerSec /= 1000; - } + } if (tickPerSec <= 0 || tickPerSec < TSDB_TICK_PER_SECOND(info.precision)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg10); @@ -2978,7 +2969,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { SSqlCmd* pCmd = &pSql->cmd; pCmd->command = pInfo->type; - + return TSDB_CODE_SUCCESS; } bool validateIpAddress(const char* ip, size_t size) { @@ -3066,8 +3057,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) { } bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { - const char* msg1 = "TWA/Diff not allowed to apply to super table directly"; - const char* msg2 = "TWA/Diff only support group by tbname for super table query"; + const char* msg1 = "TWA/Diff/Derivative/Irate not allowed to apply to super table directly"; + const char* msg2 = "TWA/Diff/Derivative/Irate only support group by tbname for super table query"; const char* msg3 = "function not support for super table query"; // filter sql function not supported by metric query yet. @@ -3080,7 +3071,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) } } - if (tscIsTWAQuery(pQueryInfo) || tscIsDiffDerivQuery(pQueryInfo)) { + if (tscIsTWAQuery(pQueryInfo) || tscIsDiffDerivQuery(pQueryInfo) || tscIsIrateQuery(pQueryInfo)) { if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) { invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return true; @@ -3098,7 +3089,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) } } else if (tscIsSessionWindowQuery(pQueryInfo)) { invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); - return true; + return true; } return false; @@ -3340,24 +3331,24 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, // TK_GT,TK_GE,TK_EQ,TK_NE are based on the pColumn->lowerBndd } else if (pExpr->tokenId == TK_IN) { - tVariant *pVal; + tVariant *pVal; if (pRight->tokenId != TK_SET || !serializeExprListToVariant(pRight->pParam, &pVal, colType) || colType == TSDB_DATA_TYPE_TIMESTAMP) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg); - } + } if (validateParamOfRelationIn(pVal, colType) != TSDB_CODE_SUCCESS) { - tVariantDestroy(pVal); + tVariantDestroy(pVal); free(pVal); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg); } pColumnFilter->pz = (int64_t)calloc(1, pVal->nLen + 1); pColumnFilter->len = pVal->nLen; pColumnFilter->filterstr = 1; - memcpy((char *)(pColumnFilter->pz), (char *)(pVal->pz), pVal->nLen); + memcpy((char *)(pColumnFilter->pz), (char *)(pVal->pz), pVal->nLen); //retVal = tVariantDump(pVal, (char *)(pColumnFilter->pz), TSDB_DATA_TYPE_BINARY, false); - tVariantDestroy(pVal); + tVariantDestroy(pVal); free(pVal); - + } else if (colType == TSDB_DATA_TYPE_BINARY) { pColumnFilter->pz = (int64_t)calloc(1, bufLen * TSDB_NCHAR_SIZE); pColumnFilter->len = pRight->value.nLen; @@ -4884,7 +4875,7 @@ int32_t getTimeRange(STimeWindow* win, tSqlExpr* pRight, int32_t optr, int16_t t if (pRight->flags & (1 << EXPR_FLAG_NS_TIMESTAMP)) { pRight->value.i64 = convertTimePrecision(pRight->value.i64, TSDB_TIME_PRECISION_NANO, timePrecision); } - + tVariantDump(&pRight->value, (char*)&val, TSDB_DATA_TYPE_BIGINT, true); } @@ -5542,7 +5533,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pItem->type != TSDB_DATA_TYPE_BINARY && pItem->type != TSDB_DATA_TYPE_NCHAR) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg21); } - + SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER; SStrToken name = {.type = TK_STRING, .z = pItem->name, .n = (uint32_t)strlen(pItem->name)}; if (getColumnIndexByName(pCmd, &name, pQueryInfo, &columnIndex) != TSDB_CODE_SUCCESS) { @@ -5563,11 +5554,11 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { (pItem->type == TSDB_DATA_TYPE_NCHAR && (pItem->bytes <= 0 || pItem->bytes > TSDB_MAX_NCHAR_LEN))) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg24); } - + if (pItem->bytes <= pColSchema->bytes) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); } - + TAOS_FIELD f = tscCreateField(pColSchema->type, name.z, pItem->bytes); tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); }else if (pAlterSQL->type == TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN) { @@ -5579,7 +5570,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pItem->type != TSDB_DATA_TYPE_BINARY && pItem->type != TSDB_DATA_TYPE_NCHAR) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg21); } - + SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER; SStrToken name = {.type = TK_STRING, .z = pItem->name, .n = (uint32_t)strlen(pItem->name)}; if (getColumnIndexByName(pCmd, &name, pQueryInfo, &columnIndex) != TSDB_CODE_SUCCESS) { @@ -5604,11 +5595,11 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { (pItem->type == TSDB_DATA_TYPE_NCHAR && (pItem->bytes <= 0 || pItem->bytes > TSDB_MAX_NCHAR_LEN))) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg24); } - + if (pItem->bytes <= pColSchema->bytes) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); } - + TAOS_FIELD f = tscCreateField(pColSchema->type, name.z, pItem->bytes); tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); } @@ -5974,7 +5965,7 @@ static int32_t setKeepOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreateDbInfo* p tVariantListItem* p0 = taosArrayGet(pKeep, 0); tVariantListItem* p1 = (s > 1) ? taosArrayGet(pKeep, 1) : p0; tVariantListItem* p2 = (s > 2) ? taosArrayGet(pKeep, 2) : p1; - + if ((int32_t)p0->pVar.i64 <= 0 || (int32_t)p1->pVar.i64 <= 0 || (int32_t)p2->pVar.i64 <= 0) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -7068,7 +7059,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { } else { if (pQueryInfo->interval.interval == 0) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); - } + } } // set the created table[stream] name @@ -7791,7 +7782,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf } } - // todo derivate funtion requires ts column exists in subquery + // todo derivative function requires ts column exists in subquery STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, 0); @@ -7884,7 +7875,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_OPERATION; } - // parse the window_state + // parse the window_state if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_OPERATION; } @@ -8099,7 +8090,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pS SColIndex* idx = taosArrayGet(pCols, 0); SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, idx->colIndex); if (pSchema != NULL) { - colType = pSchema->type; + colType = pSchema->type; } } diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 7a0b3451c3b415e8e565fe091de6d1ed0e478409..ef46b4068ecb0641aa1730d48f1e4ad4d95b8222 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -283,6 +283,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { SArray* tables = getTableList(pSql); if (tables == NULL) { + pSub->lastSyncTime = 0; //force to get table list next time return 0; } size_t numOfTables = taosArrayGetSize(tables); @@ -489,7 +490,15 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { SSub *pSub = (SSub *)tsub; if (pSub == NULL) return NULL; - if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { + if (pSub->pTimer == NULL) { + int64_t duration = taosGetTimestampMs() - pSub->lastConsumeTime; + if (duration < (int64_t)(pSub->interval)) { + tscDebug("subscription consume too frequently, blocking..."); + taosMsleep(pSub->interval - (int32_t)duration); + } + } + + if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { //may reach here when retireve stable vgroup failed SSqlObj* pSql = recreateSqlObj(pSub); if (pSql == NULL) { return NULL; @@ -503,6 +512,11 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { pSub->pSql = pSql; pSql->pSubscription = pSub; + + // no table list now, force to update it + tscDebug("begin table synchronization"); + if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; + tscDebug("table synchronization completed"); } tscSaveSubscriptionProgress(pSub); @@ -527,14 +541,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { tscDebug("subscribe:%s set next round subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey); } - if (pSub->pTimer == NULL) { - int64_t duration = taosGetTimestampMs() - pSub->lastConsumeTime; - if (duration < (int64_t)(pSub->interval)) { - tscDebug("subscription consume too frequently, blocking..."); - taosMsleep(pSub->interval - (int32_t)duration); - } - } - size_t size = taosArrayGetSize(pSub->progress) * sizeof(STableIdInfo); size += sizeof(SQueryTableMsg) + 4096; int code = tscAllocPayload(&pSql->cmd, (int)size); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index e7f0606db2ad1bcfbb257d79f016e91e87a8e787..22a603b71eacecb635350b518221654db9292810 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -109,7 +109,7 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { // pthread_mutex_unlock(&subState->mutex); // return false; // } - + tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx); subState->states[idx] = 1; @@ -622,7 +622,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); // set the tag column id for executor to extract correct tag value -#ifndef _TD_NINGSI_60 +#ifndef _TD_NINGSI_60 pExpr->base.param[0] = (tVariant) {.i64 = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)}; #else pExpr->base.param[0].i64 = colId; @@ -1843,7 +1843,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter // refactor as one method SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd); assert(pNewQueryInfo != NULL); - + pSupporter->colList = pNewQueryInfo->colList; pNewQueryInfo->colList = NULL; @@ -3158,7 +3158,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { pRes->code = TSDB_CODE_TSC_APP_ERROR; return pRes->code; } - + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter)); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 9011bae47ba78945e4f87dabb91cbb1d5d900d9e..9d2c500a92d8641d4b7f5c4573c74926fa97944d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -460,6 +460,22 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) { return false; } +bool tscIsIrateQuery(SQueryInfo* pQueryInfo) { + size_t numOfExprs = tscNumOfExprs(pQueryInfo); + for (int32_t i = 0; i < numOfExprs; ++i) { + SExprInfo* pExpr = tscExprGet(pQueryInfo, i); + if (pExpr == NULL) { + continue; + } + + if (pExpr->base.functionId == TSDB_FUNC_IRATE) { + return true; + } + } + + return false; +} + bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) { return pQueryInfo->sessionWindow.gap > 0; } @@ -3477,6 +3493,7 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) { if (pSql->res.code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); } else { + pParentSql->res.code = pSql->res.code; tscAsyncResultOnError(pParentSql); } } diff --git a/src/connector/jdbc/CMakeLists.txt b/src/connector/jdbc/CMakeLists.txt index f6829bd0ead8c463a3c9d56156a6d7ec51057f1f..1db8361fafc985857987a4dfc5a041ae14c86df7 100644 --- a/src/connector/jdbc/CMakeLists.txt +++ b/src/connector/jdbc/CMakeLists.txt @@ -12,4 +12,4 @@ IF (TD_MVN_INSTALLED) COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml COMMENT "build jdbc driver") ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME}) -ENDIF () +ENDIF () \ No newline at end of file diff --git a/src/dnode/inc/dnodeCfg.h b/src/dnode/inc/dnodeCfg.h index 896b3f574c2e0d02a0d62048a411fa484d16130b..99733e46ef10ca7c17c1f49b344414ce7f564574 100644 --- a/src/dnode/inc/dnodeCfg.h +++ b/src/dnode/inc/dnodeCfg.h @@ -27,6 +27,7 @@ void dnodeUpdateCfg(SDnodeCfg *cfg); int32_t dnodeGetDnodeId(); void dnodeGetClusterId(char *clusterId); void dnodeGetCfg(int32_t *dnodeId, char *clusterId); +void dnodeSetDropped(); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeCfg.c b/src/dnode/src/dnodeCfg.c index 586adacc98712e63aa8d83367cb5d2ec5f9157ec..4269c77bf33eb9aa5fc0edafde39f9b4772d53d9 100644 --- a/src/dnode/src/dnodeCfg.c +++ b/src/dnode/src/dnodeCfg.c @@ -21,6 +21,7 @@ static SDnodeCfg tsCfg = {0}; static pthread_mutex_t tsCfgMutex; +static int32_t tsDnodeDropped; static int32_t dnodeReadCfg(); static int32_t dnodeWriteCfg(); @@ -34,6 +35,10 @@ int32_t dnodeInitCfg() { if (ret == 0) { dInfo("dnode cfg is initialized"); } + if (tsDnodeDropped) { + dInfo("dnode is dropped, exiting"); + return -1; + } return ret; } @@ -44,6 +49,14 @@ void dnodeUpdateCfg(SDnodeCfg *cfg) { dnodeResetCfg(cfg); } +void dnodeSetDropped() { + pthread_mutex_lock(&tsCfgMutex); + tsDnodeDropped = 1; + + dnodeWriteCfg(); + pthread_mutex_unlock(&tsCfgMutex); +} + int32_t dnodeGetDnodeId() { int32_t dnodeId = 0; pthread_mutex_lock(&tsCfgMutex); @@ -119,6 +132,14 @@ static int32_t dnodeReadCfg() { } cfg.dnodeId = (int32_t)dnodeId->valueint; + cJSON *dnodeDropped = cJSON_GetObjectItem(root, "dnodeDropped"); + if (!dnodeDropped || dnodeDropped->type != cJSON_Number) { + dError("failed to read %s, dnodeDropped not found", file); + //goto PARSE_CFG_OVER; + } else { + tsDnodeDropped = (int32_t)dnodeDropped->valueint; + } + cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { dError("failed to read %s, clusterId not found", file); @@ -154,6 +175,7 @@ static int32_t dnodeWriteCfg() { len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsCfg.dnodeId); + len += snprintf(content + len, maxLen - len, " \"dnodeDropped\": %d,\n", tsDnodeDropped); len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsCfg.clusterId); len += snprintf(content + len, maxLen - len, "}\n"); diff --git a/src/dnode/src/dnodeVnodes.c b/src/dnode/src/dnodeVnodes.c index d96251cebe1cbbcddeb48f1a09091e1c09caf630..f01a510370758a04fe8972304ae352b796dc6e35 100644 --- a/src/dnode/src/dnodeVnodes.c +++ b/src/dnode/src/dnodeVnodes.c @@ -202,6 +202,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { char clusterId[TSDB_CLUSTER_ID_LEN]; dnodeGetClusterId(clusterId); if (clusterId[0] != '\0') { + dnodeSetDropped(); dError("exit zombie dropped dnode"); exit(EXIT_FAILURE); } diff --git a/src/query/inc/qTableMeta.h b/src/query/inc/qTableMeta.h index ec389de9e6dde48f3c7b0b200184ad75c4e0ec0f..4fc252b644efab0c6bfdb97b19c3af3ef9c68920 100644 --- a/src/query/inc/qTableMeta.h +++ b/src/query/inc/qTableMeta.h @@ -138,7 +138,6 @@ typedef struct SQueryInfo { bool hasFilter; bool onlyTagQuery; bool orderProjectQuery; -// bool diffQuery; bool stateWindow; } SQueryInfo; diff --git a/src/query/inc/sql.y b/src/query/inc/sql.y index 2a37b1fc2d81ca2f63cea18c521ed60a310881c9..63bfd859768f92da6ae1e90c27950337c528ab8e 100644 --- a/src/query/inc/sql.y +++ b/src/query/inc/sql.y @@ -176,7 +176,7 @@ cmd ::= ALTER ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSql(p ////////////////////////////// COMPACT STATEMENT ////////////////////////////////////////////// -cmd ::= COMPACT VNODES IN LP exprlist(Y) RP. { setCompactVnodeSql(pInfo, TSDB_SQL_COMPACT_VNODE, Y);} +cmd ::= COMPACT VNODES IN LP exprlist(Y) RP. { setCompactVnodeSql(pInfo, TSDB_SQL_COMPACT_VNODE, Y);} // An IDENTIFIER can be a generic identifier, or one of several keywords. // Any non-standard keyword can also be an identifier. diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 68de90255643ee6ceae6b560cd53226827c29017..bc14c75af561706a214fb950d2ae8567ef2c442c 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3613,16 +3613,7 @@ static void deriv_function(SQLFunctionCtx *pCtx) { qError("error input type"); } - // initial value is not set yet, all data block are null - if (!pDerivInfo->valueSet || notNullElems <= 0) { - /* - * 1. current block and blocks before are full of null - * 2. current block may be null value - */ - assert(pCtx->hasNull); - } else { - GET_RES_INFO(pCtx)->numOfRes += notNullElems; - } + GET_RES_INFO(pCtx)->numOfRes += notNullElems; } #define DIFF_IMPL(ctx, d, type) \ diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 95f9a6b9557cd020d5f5bc2695fed3a747e5c103..7b106c178d8f7554d6316d8a5fc1e1504b763199 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1312,6 +1312,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn } } + + // todo opt perf for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { pInfo->binfo.pCtx[k].size = 1; int32_t functionId = pInfo->binfo.pCtx[k].functionId; diff --git a/tests/script/general/parser/function.sim b/tests/script/general/parser/function.sim index ad900b92e01ca46b7f4fb8afbd30a5a408bfb7ac..a485276e018e80c83ec6a1d34e4d597b85118a2a 100644 --- a/tests/script/general/parser/function.sim +++ b/tests/script/general/parser/function.sim @@ -817,6 +817,9 @@ print ====================> TODO stddev + normal column filter print ====================> irate +sql_error select irate(f1) from st1; +sql select irate(f1) from st1 group by tbname; + sql select irate(k) from t1 if $rows != 1 then return -1 @@ -1073,6 +1076,12 @@ sql insert into t0 values('2020-1-1 1:3:9', 9); sql insert into t0 values('2020-1-1 1:4:10', 10); sql insert into t1 values('2020-1-1 1:1:2', 2); +print ===========================>td-4739 +sql select diff(val) from (select derivative(k, 1s, 0) val from t1); +if $rows != 0 then + return -1 +endi + sql insert into t1 values('2020-1-1 1:1:4', 20); sql insert into t1 values('2020-1-1 1:1:6', 200); sql insert into t1 values('2020-1-1 1:1:8', 2000);