diff --git a/README.md b/README.md index 522fc0ebc1277684b77589c5c62d061f7b33ce32..36436dd549e22f58caefd82c4a9be4e8aed8d869 100644 --- a/README.md +++ b/README.md @@ -83,12 +83,18 @@ sudo dnf install -y maven ## Get the source codes -- github: +First of all, you may clone the source codes from github: ```bash git clone https://github.com/taosdata/TDengine.git cd TDengine ``` +The connectors for go & grafana have been moved to separated repositories, +so you should run this command in the TDengine directory to install them: +```bash +git submodule update --init --recursive +``` + ## Build TDengine ### On Linux platform diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 14fb32d7ab9058d565faf8de166e534d895be501..046bea2c27df8bbe1cc012d1511980f691ff60a5 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -229,7 +229,6 @@ typedef struct SQueryInfo { // TODO refactor STimeWindow window; // query time window SInterval interval; - int32_t tz; // query client timezone SSqlGroupbyExpr groupbyExpr; // group by tags info SArray * colList; // SArray diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index c5d622e2452d42e6863a294ce83fb850d21c5818..400613595b605bae4e5c7c23353aa4243a0eb933 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -509,7 +509,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { goto _error; } - if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) { + if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) { STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 5a33e05a538425ca5f67d4bab52b66d441651b92..a15673bdb1edf97fda23d879ab69839fd500549e 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -430,7 +430,7 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) { SSqlObj* pSql = builder->pInterSql; if (row == NULL) { - return TSDB_CODE_MND_INVALID_TABLE_NAME; + return TSDB_CODE_TSC_INVALID_TABLE_NAME; } int32_t* lengths = taos_fetch_lengths(pSql); @@ -458,7 +458,7 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) { } if (0 == strlen(result)) { - return TSDB_CODE_MND_INVALID_TABLE_NAME; + return TSDB_CODE_TSC_INVALID_TABLE_NAME; } return TSDB_CODE_SUCCESS; } @@ -554,7 +554,7 @@ int32_t tscRebuildCreateTableStatement(void *param,char *result) { static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) { TAOS_ROW row = tscFetchRow(builder); if (row == NULL) { - return TSDB_CODE_MND_DB_NOT_SELECTED; + return TSDB_CODE_TSC_DB_NOT_SELECTED; } const char *showColumns[] = {"REPLICA", "QUORUM", "DAYS", "KEEP", "BLOCKS", NULL}; @@ -586,7 +586,7 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) { } while (row != NULL); if (0 == strlen(result)) { - return TSDB_CODE_MND_DB_NOT_SELECTED; + return TSDB_CODE_TSC_DB_NOT_SELECTED; } return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index ad39f9869d525083a190565369b708552ffb3edc..b65d7b7112f27358bab82308d4891b99c6f8daa3 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -362,8 +362,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } // additional msg has been attached already - if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + code = tscSetTableFullName(pTableMetaInfo, pToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } return tscGetTableMeta(pSql, pTableMetaInfo); @@ -381,14 +382,15 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } - if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + code = tscSetTableFullName(pTableMetaInfo, pToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } + return tscGetTableMeta(pSql, pTableMetaInfo); } case TSDB_SQL_SHOW_CREATE_DATABASE: { const char* msg1 = "invalid database name"; - const char* msg2 = "table name is too long"; SStrToken* pToken = &pInfo->pDCLInfo->a[0]; if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) { @@ -397,11 +399,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pToken->n > TSDB_DB_NAME_LEN) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); - } - return TSDB_CODE_SUCCESS; - } + + return tscSetTableFullName(pTableMetaInfo, pToken, pSql); + } case TSDB_SQL_CFG_DNODE: { const char* msg2 = "invalid configure options or values, such as resetlog / debugFlag 135 / balance 'vnode:2-dnode:2' / monitor 1 "; const char* msg3 = "invalid dnode ep"; @@ -805,55 +805,44 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql) { const char* msg1 = "name too long"; - const char* msg2 = "current database or database name invalid"; SSqlCmd* pCmd = &pSql->cmd; int32_t code = TSDB_CODE_SUCCESS; // backup the old name in pTableMetaInfo - size_t size = strlen(pTableMetaInfo->name); - char* oldName = NULL; - if (size > 0) { - oldName = strdup(pTableMetaInfo->name); - } + char oldName[TSDB_TABLE_FNAME_LEN] = {0}; + tstrncpy(oldName, pTableMetaInfo->name, tListLen(oldName)); - if (hasSpecifyDB(pzTableName)) { - // db has been specified in sql string so we ignore current db path + if (hasSpecifyDB(pzTableName)) { // db has been specified in sql string so we ignore current db path code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL); - if (code != TSDB_CODE_SUCCESS) { + if (code != 0) { invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - } else { // get current DB name first, then set it into path + } else { // get current DB name first, and then set it into path SStrToken t = {0}; getCurrentDBName(pSql, &t); - if (t.n == 0) { - code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + if (t.n == 0) { // current database not available or not specified + code = TSDB_CODE_TSC_DB_NOT_SELECTED; } else { code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); - if (code != TSDB_CODE_SUCCESS) { - code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + if (code != 0) { + invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - } + } } if (code != TSDB_CODE_SUCCESS) { - taosTFree(oldName); return code; } /* - * the old name exists and is not equalled to the new name. Release the metermeta/metricmeta + * the old name exists and is not equalled to the new name. Release the table meta * that are corresponding to the old name for the new table name. */ - if (size > 0) { - if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { - tscClearTableMetaInfo(pTableMetaInfo, false); - } - } else { - assert(pTableMetaInfo->pTableMeta == NULL); + if (strlen(oldName) > 0 && strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { + tscClearTableMetaInfo(pTableMetaInfo, false); } - taosTFree(oldName); return TSDB_CODE_SUCCESS; } @@ -4566,6 +4555,8 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { const char* msg18 = "primary timestamp column cannot be dropped"; const char* msg19 = "invalid new tag name"; + int32_t code = TSDB_CODE_SUCCESS; + SSqlCmd* pCmd = &pSql->cmd; SAlterTableSQL* pAlterSQL = pInfo->pAlterInfo; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); @@ -4576,13 +4567,14 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, &(pAlterSQL->name), pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + code = tscSetTableFullName(pTableMetaInfo, &(pAlterSQL->name), pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } - int32_t ret = tscGetTableMeta(pSql, pTableMetaInfo); - if (ret != TSDB_CODE_SUCCESS) { - return ret; + code = tscGetTableMeta(pSql, pTableMetaInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; } STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; @@ -5880,8 +5872,9 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + int32_t code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql); + if(code != TSDB_CODE_SUCCESS) { + return code; } if (!validateTableColumnInfo(pFieldList, pCmd) || @@ -5935,15 +5928,16 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pStableMeterMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + int32_t code = tscSetTableFullName(pStableMeterMetaInfo, pToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } // get meter meta from mnode tstrncpy(pCreateTable->usingInfo.tagdata.name, pStableMeterMetaInfo->name, sizeof(pCreateTable->usingInfo.tagdata.name)); tVariantList* pList = pInfo->pCreateTableInfo->usingInfo.pTagVals; - int32_t code = tscGetTableMeta(pSql, pStableMeterMetaInfo); + code = tscGetTableMeta(pSql, pStableMeterMetaInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -6020,7 +6014,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { const char* msg1 = "invalid table name"; - const char* msg2 = "table name too long"; const char* msg3 = "fill only available for interval query"; const char* msg4 = "fill option not supported in stream computing"; const char* msg5 = "sql too long"; // todo ADD support @@ -6052,11 +6045,12 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, &srcToken, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + int32_t code = tscSetTableFullName(pTableMetaInfo, &srcToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } - int32_t code = tscGetTableMeta(pSql, pTableMetaInfo); + code = tscGetTableMeta(pSql, pTableMetaInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -6083,8 +6077,9 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { } // set the created table[stream] name - if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } if (pQuerySql->selectToken.n > TSDB_MAX_SAVED_SQL_LEN) { @@ -6128,7 +6123,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { assert(pQuerySql != NULL && (pQuerySql->from == NULL || pQuerySql->from->nExpr > 0)); const char* msg0 = "invalid table name"; - //const char* msg1 = "table name too long"; const char* msg2 = "point interpolation query needs timestamp"; const char* msg5 = "fill only available for interval query"; const char* msg6 = "start(end) time of query range required or time range too large"; @@ -6200,11 +6194,16 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo, i/2); SStrToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz}; - if (tscSetTableFullName(pTableMetaInfo1, &t, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + code = tscSetTableFullName(pTableMetaInfo1, &t, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } tVariant* pTableItem1 = &pQuerySql->from->a[i + 1].pVar; + if (pTableItem1->nType != TSDB_DATA_TYPE_BINARY) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11); + } + SStrToken aliasName = {.z = pTableItem1->pz, .n = pTableItem1->nLen, .type = TK_STRING}; if (tscValidateName(&aliasName) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d3a16a5d6d650d93a158ba64b52a14898d421dc9..d3c02011690fd20e0cb7d7aaf7b98877581bbb42 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -53,7 +53,10 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); SRpcEpSet* pEpSet = &pSql->epSet; - pEpSet->inUse = 0; + + // Issue the query to one of the vnode among a vgroup randomly. + // change the inUse property would not affect the isUse attribute of STableMeta + pEpSet->inUse = rand() % pVgroupInfo->numOfEps; // apply the FQDN string length check here bool hasFqdn = false; @@ -144,12 +147,13 @@ void tscPrintMgmtEp() { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { STscObj *pObj = (STscObj *)param; if (pObj == NULL) return; + if (pObj != pObj->signature) { tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature); return; } - SSqlObj *pSql = pObj->pHb; + SSqlObj *pSql = tres; SSqlRes *pRes = &pSql->res; if (code == 0) { @@ -170,10 +174,17 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); } } else { - tscDebug("heart beat failed, code:%s", tstrerror(code)); + tscDebug("heartbeat failed, code:%s", tstrerror(code)); } - taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); + if (pObj->pHb != NULL) { + int32_t waitingDuring = tsShellActivityTimer * 500; + tscDebug("%p start heartbeat in %dms", pSql, waitingDuring); + + taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); + } else { + tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj); + } } void tscProcessActivityTimer(void *handle, void *tmrId) { @@ -249,6 +260,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SSqlCmd *pCmd = &pSql->cmd; assert(*pSql->self == pSql); + pSql->pRpcCtx = NULL; if (pObj->signature != pObj) { tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); @@ -258,8 +270,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { return; } - pSql->pRpcCtx = NULL; // clear the rpcCtx - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", @@ -474,6 +484,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; if (pSub->pRpcCtx != NULL) { rpcCancelRequest(pSub->pRpcCtx); + pSub->pRpcCtx = NULL; } tscQueueAsyncRes(pSub); // async res? not other functions? diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 91f7fd4638347223e620f06336ebc50bfca397f2..ac7081ba700cfd9a65069e5d268ea7157ca3727b 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -698,6 +698,7 @@ void taos_stop_query(TAOS_RES *res) { tscKillSTableQuery(pSql); } else { if (pSql->cmd.command < TSDB_SQL_LOCAL) { + assert(pSql->pRpcCtx != NULL); rpcCancelRequest(pSql->pRpcCtx); } } diff --git a/src/connector/go b/src/connector/go index 06ec30a0f1762e8169bf6b9045c82bcaa52bcdf0..8c58c512b6acda8bcdfa48fdc7140227b5221766 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 06ec30a0f1762e8169bf6b9045c82bcaa52bcdf0 +Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 17b2d24e909ff1a62f7c14b7ec658626b6deb159..b5d22ea80c2c0a9f338b537c3452929d0141e6e5 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -98,7 +98,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed") -TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax errr in SQL") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax error in SQL") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, 0, 0x0217, "Database not specified or available") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, 0, 0x0218, "Table does not exist") // mnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed") diff --git a/src/os/inc/osTime.h b/src/os/inc/osTime.h index 99f7586f721acbc64b1c9e3e70bf2e33a6092fc8..6b209219c6c736ed95c69659a78d14037987be00 100644 --- a/src/os/inc/osTime.h +++ b/src/os/inc/osTime.h @@ -63,9 +63,10 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) { typedef struct SInterval { - char intervalUnit; - char slidingUnit; - char offsetUnit; + int32_t tz; // query client timezone + char intervalUnit; + char slidingUnit; + char offsetUnit; int64_t interval; int64_t sliding; int64_t offset; diff --git a/src/os/src/detail/osTime.c b/src/os/src/detail/osTime.c index bd4dc24554b26d0b4aae2cf8e82c02ee3fbba4b9..b78627f46f2124912338f3b029b5a0fd341e729c 100644 --- a/src/os/src/detail/osTime.c +++ b/src/os/src/detail/osTime.c @@ -481,7 +481,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio start = (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision)); } else { int64_t delta = t - pInterval->interval; - int32_t factor = delta > 0 ? 1 : -1; + int32_t factor = (delta >= 0) ? 1 : -1; start = (delta / pInterval->sliding + factor) * pInterval->sliding; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 52765260f51d4099fed6603c7046f0ecfcdeb810..77a402c7bef84157c6188c9482086e8f62310882 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2225,10 +2225,11 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { return false; } -int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { +int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { SQuery *pQuery = pRuntimeEnv->pQuery; - *status = 0; + *status = BLK_DATA_NO_NEEDED; + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf > 0) { *status = BLK_DATA_ALL_NEEDED; } else { // check if this data block is required to load @@ -2240,12 +2241,26 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, } if ((*status) != BLK_DATA_ALL_NEEDED) { + // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, + // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + bool hasTimeWindow = false; + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + + TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey; + + STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery); + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo->tid, &win, masterScan, &hasTimeWindow) != + TSDB_CODE_SUCCESS) { + // todo handle error in set result for timewindow + } + } + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base; int32_t functionId = pSqlFunc->functionId; int32_t colId = pSqlFunc->colInfo.colId; - (*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { break; @@ -2476,7 +2491,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SArray * pDataBlock = NULL; uint32_t status = 0; - int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); + int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); if (ret != TSDB_CODE_SUCCESS) { break; } @@ -4667,18 +4682,17 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo); } - SDataStatis *pStatis = NULL; - SArray * pDataBlock = NULL; uint32_t status = 0; + SDataStatis *pStatis = NULL; + SArray *pDataBlock = NULL; - int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); + int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->windowResInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); if (ret != TSDB_CODE_SUCCESS) { break; } if (status == BLK_DATA_DISCARD) { - pQuery->current->lastKey = - QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; + pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step : blockInfo.window.skey + step; continue; } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 2a3facdb36c0b5ea71aa4b3140245f06404f8bea..06b5d39d6154af55b59fc6662c1ecb622c55e77e 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -62,7 +62,7 @@ typedef struct { char label[TSDB_LABEL_LEN]; int numOfThreads; void * shandle; - SThreadObj *pThreadObj; + SThreadObj **pThreadObj; pthread_t thread; } SServerObj; @@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); pServerObj->numOfThreads = numOfThreads; - pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); + pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads); if (pServerObj->pThreadObj == NULL) { tError("TCP:%s no enough memory", label); terrno = TAOS_SYSTEM_ERROR(errno); @@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); // initialize parameters in case it may encounter error later - pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1); + if (pThreadObj == NULL) { + tError("TCP:%s no enough memory", label); + terrno = TAOS_SYSTEM_ERROR(errno); + for (int j=0; jpThreadObj[j]); + free(pServerObj->pThreadObj); + free(pServerObj); + return NULL; + } + + pServerObj->pThreadObj[i] = pThreadObj; pThreadObj->pollFd = -1; taosResetPthread(&pThreadObj->thread); pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; - pThreadObj++; } // initialize mutex, thread, fd which may fail - pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj = pServerObj->pThreadObj[i]; code = pthread_mutex_init(&(pThreadObj->mutex), NULL); if (code < 0) { tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); @@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } pThreadObj->threadId = i; - pThreadObj++; } pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); @@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { pThreadObj->stop = true; eventfd_t fd = -1; + if (taosComparePthread(pThreadObj->thread, pthread_self())) { + pthread_detach(pthread_self()); + return; + } + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { // signal the thread to stop, try graceful method first, // and use pthread_cancel when failed @@ -183,15 +196,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } } - if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL); - if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); - if (fd != -1) taosCloseSocket(fd); - - while (pThreadObj->pHead) { - SFdObj *pFdObj = pThreadObj->pHead; - pThreadObj->pHead = pFdObj->next; - taosFreeFdObj(pFdObj); + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { + pthread_join(pThreadObj->thread, NULL); } + + if (fd != -1) taosCloseSocket(fd); } void taosStopTcpServer(void *handle) { @@ -199,7 +208,14 @@ void taosStopTcpServer(void *handle) { if (pServerObj == NULL) return; if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); - if (taosCheckPthreadValid(pServerObj->thread)) pthread_join(pServerObj->thread, NULL); + + if (taosCheckPthreadValid(pServerObj->thread)) { + if (taosComparePthread(pServerObj->thread, pthread_self())) { + pthread_detach(pthread_self()); + } else { + pthread_join(pServerObj->thread, NULL); + } + } tDebug("%s TCP server is stopped", pServerObj->label); } @@ -210,9 +226,8 @@ void taosCleanUpTcpServer(void *handle) { if (pServerObj == NULL) return; for (int i = 0; i < pServerObj->numOfThreads; ++i) { - pThreadObj = pServerObj->pThreadObj + i; + pThreadObj = pServerObj->pThreadObj[i]; taosStopTcpThread(pThreadObj); - pthread_mutex_destroy(&(pThreadObj->mutex)); } tDebug("%s TCP server is cleaned up", pServerObj->label); @@ -249,7 +264,7 @@ static void *taosAcceptTcpConnection(void *arg) { taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); // pick up the thread to handle this connection - pThreadObj = pServerObj->pThreadObj + threadId; + pThreadObj = pServerObj->pThreadObj[threadId]; SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); if (pFdObj) { @@ -329,8 +344,6 @@ void taosCleanUpTcpClient(void *chandle) { taosStopTcpThread(pThreadObj); tDebug ("%s TCP client is cleaned up", pThreadObj->label); - - taosTFree(pThreadObj); } void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { @@ -503,8 +516,22 @@ static void *taosProcessTcpData(void *param) { pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); } + + if (pThreadObj->stop) break; + } + + if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); + + while (pThreadObj->pHead) { + SFdObj *pFdObj = pThreadObj->pHead; + pThreadObj->pHead = pFdObj->next; + taosFreeFdObj(pFdObj); } + pthread_mutex_destroy(&(pThreadObj->mutex)); + tDebug("%s TCP thread exits ...", pThreadObj->label); + taosTFree(pThreadObj); + return NULL; } diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index eda822b1ec0b66ac984106566e03d5a1131ede8a..6a210a136ffe67b2e1394d26bac4cb5083452c8c 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) { continue; } } + } + + if (pThread->stop) break; } uDebug("%p TCP epoll thread exits", pThread); @@ -321,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) { } pthread_join(thread, NULL); - taosClose(fd); + if (fd >= 0) taosClose(fd); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 6c1602a857cc40cff59c5338a8e2b4372d081bba..d3f4747a96d509ef9764661a7f4bc7e8c8ceaef4 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1266,6 +1266,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* cur->pos >= 0 && cur->pos < pBlock->numOfRows); TSKEY* tsArray = pCols->cols[0].pData; + assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast); // for search the endPos, so the order needs to reverse int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; diff --git a/tests/pytest/crash_gen/crash_gen.py b/tests/pytest/crash_gen/crash_gen.py index 3e8659176b4be780a64bb62efe61bf850f897ed5..e024eb7494bb8a45e443691947d9dee04e1ab288 100755 --- a/tests/pytest/crash_gen/crash_gen.py +++ b/tests/pytest/crash_gen/crash_gen.py @@ -1648,6 +1648,8 @@ class Task(): if errno in [ 0x05, # TSDB_CODE_RPC_NOT_READY # 0x200, # invalid SQL, TODO: re-examine with TD-934 + 0x217, # "db not selected", client side defined error code + 0x218, # "Table does not exist" client side defined error code 0x360, 0x362, 0x369, # tag already exists 0x36A, 0x36B, 0x36D, diff --git a/tests/script/general/http/restful_full.sim b/tests/script/general/http/restful_full.sim index a02140a4199054775c405ec3c6223faa2ef681e4..8d2f1a7c00304c42c91311ae703bcef97aa6ace0 100644 --- a/tests/script/general/http/restful_full.sim +++ b/tests/script/general/http/restful_full.sim @@ -119,7 +119,7 @@ endi system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' used1' 127.0.0.1:7111/rest/sql print 17-> $system_content -if $system_content != @{"status":"error","code":534,"desc":"Syntax errr in SQL"}@ then +if $system_content != @{"status":"error","code":534,"desc":"Syntax error in SQL"}@ then return -1 endi diff --git a/tests/script/general/parser/topbot.sim b/tests/script/general/parser/topbot.sim index 5c575b61633d77521ef0c8b51b6e46aaf84447aa..c2b41888d739c559b24784c42ad040d54d7ae160 100644 --- a/tests/script/general/parser/topbot.sim +++ b/tests/script/general/parser/topbot.sim @@ -213,4 +213,53 @@ if $data01 != 5195.000000000 then return -1 endi +print =======================>td-1596 +sql create table t2(ts timestamp, k int) +sql insert into t2 values('2020-1-2 1:1:1', 1); +sql insert into t2 values('2020-2-2 1:1:1', 1); + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start +sql connect +sleep 1000 + +sql use db +sql select count(*), first(ts), last(ts) from t2 interval(1d); +if $rows != 2 then + return -1 +endi + +if $data00 != @20-01-02 00:00:00.000@ then + print expect 20-01-02 00:00:00.000, actual: $data00 + return -1 +endi + +if $data10 != @20-02-02 00:00:00.000@ then + return -1 +endi + +if $data01 != 1 then + return -1 +endi + +if $data11 != 1 then + return -1 +endi + +if $data02 != @20-01-02 01:01:01.000@ then + return -1 +endi + +if $data12 != @20-02-02 01:01:01.000@ then + return -1 +endi + +if $data03 != @20-01-02 01:01:01.000@ then + return -1 +endi + +if $data13 != @20-02-02 01:01:01.000@ then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/general/parser/where.sim b/tests/script/general/parser/where.sim index 46442e65b1747aeeb5fb73bea7ed949db10faa64..e609dda652304846081c28a43ed6745be3806eb3 100644 --- a/tests/script/general/parser/where.sim +++ b/tests/script/general/parser/where.sim @@ -238,6 +238,13 @@ if $data11 != @19-01-01 09:10:00.000@ then endi sql create table tb_where_NULL (ts timestamp, c1 float, c2 binary(10)) + +print ===================>td-1604 +sql_error insert into tb_where_NULL values(?, ?, ?) +sql_error insert into tb_where_NULL values(now, 1, ?) +sql_error insert into tb_where_NULL values(?, 1, '') +sql_error insert into tb_where_NULL values(now, ?, '12') + sql insert into tb_where_NULL values ('2019-01-01 09:00:00.000', 1, 'val1') sql insert into tb_where_NULL values ('2019-01-01 09:00:01.000', NULL, NULL) sql insert into tb_where_NULL values ('2019-01-01 09:00:02.000', 2, 'val2') @@ -334,4 +341,5 @@ if $rows != 0 then return -1 endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file