diff --git a/docs/zh/12-taos-sql/01-data-type.md b/docs/zh/12-taos-sql/01-data-type.md index be5c9a8cb4ed7f4ed9f9c7e11faf1b0f8f6e51b8..b8c9cc95547a8ea8b6b5f0d2d0e049dd20232db5 100644 --- a/docs/zh/12-taos-sql/01-data-type.md +++ b/docs/zh/12-taos-sql/01-data-type.md @@ -34,11 +34,6 @@ CREATE DATABASE db_name PRECISION 'ns'; | 10 | NCHAR | 自定义 | 记录包含多字节字符在内的字符串,如中文字符。每个 nchar 字符占用 4 bytes 的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 `\’`。nchar 使用时须指定字符串大小,类型为 nchar(10) 的列表示此列的字符串最多存储 10 个 nchar 字符,会固定占用 40 bytes 的空间。如果用户字符串长度超出声明长度,将会报错。 | | 11 | JSON | | json 数据类型, 只有 tag 可以是 json 格式 | -:::tip -TDengine 对 SQL 语句中的英文字符不区分大小写,自动转化为小写执行。因此用户大小写敏感的字符串及密码,需要使用单引号将字符串引起来。 - -::: - :::note 虽然 BINARY 类型在底层存储上支持字节型的二进制字符,但不同编程语言对二进制数据的处理方式并不保证一致,因此建议在 BINARY 类型中只存储 ASCII 可见字符,而避免存储不可见字符。多字节的数据,例如中文字符,则需要使用 NCHAR 类型进行保存。如果强行使用 BINARY 类型保存中文字符,虽然有时也能正常读写,但并不带有字符集信息,很容易出现数据乱码甚至数据损坏等情况。 diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index 1bd7d1f453866c00a3ed613e49244188c5a85fba..4b8894a022bb12c260434a21e84c7213af65fb90 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -50,7 +50,7 @@ void tscUnlockByThread(int64_t *lockedBy); int tsInsertInitialCheck(SSqlObj *pSql); -void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs); +void doCleanupSubqueries(SSqlObj *pSql); void tscFreeRetrieveSup(void **param); diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index ee2f044de6c943653f80d0c78e45a4313fb47fa4..0f27fbc027b7a4e2e2b568233bb56cf140a32bb1 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -351,7 +351,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid); int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId); -int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries); +int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries); void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 02dacfeb04209790289fb60a4a5e8ce3cee37283..f71b345b17be40d0a6b411d9f84a5bccd9da053f 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -361,6 +361,7 @@ typedef struct SSubqueryState { int8_t *states; int32_t numOfSub; // the number of total sub-queries uint64_t numOfRetrievedRows; // total number of points in this query + uint32_t version; } SSubqueryState; typedef struct SSqlObj { @@ -388,7 +389,6 @@ typedef struct SSqlObj { SSqlRes res; SSubqueryState subState; - pthread_mutex_t mtxSubs; // avoid double access pSubs after failure struct SSqlObj **pSubs; struct SSqlObj *rootObj; @@ -441,6 +441,12 @@ typedef struct SSqlStream { struct SSqlStream *prev, *next; } SSqlStream; +SSqlObj* tscAllocSqlObj(); +uint32_t tscGetVersionOfSubStateWithoutLock(SSqlObj *pSql); +SSqlObj* tscAcquireRefOfSubobj(SSqlObj *pSql, int32_t idx, uint32_t stateVersion); +void tscReleaseRefOfSubobj(SSqlObj *pSql); +void tscResetAllSubStates(SSqlObj* pSql); + void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 8e05ea73a52031e027186e19e30c457202670736..e5d7d794be09261c9c78d2a7923f98c64a850099 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -363,8 +363,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para pSql->fetchFp = fp; pSql->rootObj = pSql; - pthread_mutex_init(&pSql->mtxSubs, NULL); - registerSqlObj(pSql); pSql->sqlstr = calloc(1, sqlLen + 1); @@ -435,7 +433,7 @@ TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, v nPrintTsc("%s", sqlstr); - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + SSqlObj *pSql = tscAllocSqlObj(); if (pSql == NULL) { tscError("failed to malloc sqlObj"); tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY); diff --git a/src/client/src/tscDelete.c b/src/client/src/tscDelete.c index c03f7d0574fc56fb898f88d60b86a3a4062d0ae6..978d824e915aaf9f5bcd717978091104939234e0 100644 --- a/src/client/src/tscDelete.c +++ b/src/client/src/tscDelete.c @@ -117,7 +117,7 @@ void writeMsgVgId(char * payload, int32_t vgId) { SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrieveSupport *trsupport) { // Init SSqlCmd* pCmd = &pSql->cmd; - SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); + SSqlObj* pNew = tscAllocSqlObj(); if (pNew == NULL) { tscError("0x%"PRIx64":CDEL new subdelete failed.", pSql->self); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -153,7 +153,6 @@ SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrie // update vgroup id writeMsgVgId(pNewCmd->payload ,pVgroupMsg->vgId); - tsem_init(&pNew->rspSem, 0, 0); registerSqlObj(pNew); tscDebug("0x%"PRIx64":CDEL new sub insertion: %p", pSql->self, pNew); @@ -196,24 +195,26 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { return TSDB_CODE_VND_INVALID_VGROUP_ID; } - SSubqueryState *pState = &pSql->subState; int32_t numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; if(numOfSub == 0) { tscInfo(":CDEL SQL:%p tablename=%s numOfVgroups is zero, maybe empty table.", pSql, pTableMetaInfo->name.tname); return TSDB_CODE_FAILED; } - ret = doInitSubState(pSql, numOfSub); - if (ret != 0) { + ret = doReInitSubState(pSql, numOfSub); + if (ret != TSDB_CODE_SUCCESS) { tscAsyncResultOnError(pSql); return ret; } - tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub); + tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pSql->subState.numOfSub); pRes->code = TSDB_CODE_SUCCESS; - int32_t i; - for (i = 0; i < pState->numOfSub; ++i) { + int32_t i = 0; + + { pthread_mutex_lock(&pSql->subState.mutex); + + for (i = 0; i < pSql->subState.numOfSub; ++i) { // vgroup SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i]; @@ -235,23 +236,20 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { } pSql->pSubs[i] = pNew; } - - if (i < pState->numOfSub) { + + if (i < pSql->subState.numOfSub) { tscError("0x%"PRIx64":CDEL failed to prepare subdelete structure and launch subqueries", pSql->self); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - - doCleanupSubqueries(pSql, i); - return pRes->code; // free all allocated resource } - - if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - doCleanupSubqueries(pSql, i); + + pthread_mutex_unlock(&pSql->subState.mutex); } + + if (pRes->code != TSDB_CODE_SUCCESS) { + doCleanupSubqueries(pSql); return pRes->code; } // send sub sql doConcurrentlySendSubQueries(pSql); - //return TSDB_CODE_TSC_QUERY_CANCELLED; - return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index cda3b0a50af7014f1cd2df728cdb40b23cc11ba1..8e4bdc8b2b436a43cde9a4649e31c0430facbc88 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -295,7 +295,7 @@ void tscSCreateCallBack(void *param, TAOS_RES *tres, int code) { taos_fetch_rows_a(tres, tscSCreateCallBack, param); builder->callStage = SCREATE_CALLBACK_RETRIEVE; } else { - char *result = calloc(1, TSDB_MAX_BINARY_LEN); + char *result = calloc(1, TSDB_MAX_SQL_LEN); pRes->code = builder->fp(builder, result); taos_free_result(pSql); @@ -359,6 +359,7 @@ TAOS_ROW tscFetchRow(void *param) { tscClearSqlOwner(pSql); return data; } + static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) { TAOS_ROW row = tscFetchRow(builder); SSqlObj* pSql = builder->pInterSql; @@ -381,16 +382,16 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) { int32_t ret = tscGetNthFieldResult(row, fields, lengths, i, buf); if (i == 0) { - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s", "("); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "%s", "("); } if ((fields[i].type == TSDB_DATA_TYPE_NCHAR || fields[i].type == TSDB_DATA_TYPE_BINARY || fields[i].type == TSDB_DATA_TYPE_TIMESTAMP) && 0 == ret) { - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "\"%s\",", buf); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "\"%s\",", buf); } else if (fields[i].type == TSDB_DATA_TYPE_JSON) { - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "'%s,", buf); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "'%s,", buf); } else { - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s,", buf); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "%s,", buf); } free(buf); @@ -487,14 +488,14 @@ int32_t tscRebuildCreateTableStatement(void *param,char *result) { SCreateBuilder *builder = (SCreateBuilder *)param; int32_t code = TSDB_CODE_SUCCESS; - char *buf = calloc(1,TSDB_MAX_BINARY_LEN); + char *buf = calloc(1,TSDB_MAX_SQL_LEN); if (buf == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } code = tscGetTableTagValue(builder, buf); if (code == TSDB_CODE_SUCCESS) { - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "CREATE TABLE `%s` USING `%s` TAGS %s", builder->buf, builder->sTableName, buf); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "CREATE TABLE `%s` USING `%s` TAGS %s", builder->buf, builder->sTableName, buf); code = tscSCreateBuildResult(builder->pParentSql, SCREATE_BUILD_TABLE, builder->buf, result); } free(buf); @@ -537,7 +538,7 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) { int32_t* lengths = taos_fetch_lengths(pSql); int32_t ret = tscGetNthFieldResult(row, fields, lengths, 0, buf); if (0 == ret && 0 == strcmp(buf, builder->buf)) { - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "CREATE DATABASE `%s`", buf); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "CREATE DATABASE `%s`", buf); for (int i = 1; i < num_fields; i++) { for (int j = 0; showColumns[j][0] != NULL; j++) { if (STR_NOCASE_EQUAL(fields[i].name, strlen(fields[i].name), showColumns[j][0], strlen(showColumns[j][0]))) { @@ -545,9 +546,9 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) { ret = tscGetNthFieldResult(row, fields, lengths, i, buf); if (ret == 0) { if (STR_NOCASE_EQUAL(showColumns[j][0], strlen(showColumns[j][0]), "PRECISION", strlen("PRECISION"))) { - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), " %s '%s'", showColumns[j][1], buf); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), " %s '%s'", showColumns[j][1], buf); } else { - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), " %s %s", showColumns[j][1], buf); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), " %s %s", showColumns[j][1], buf); } } } @@ -569,7 +570,7 @@ int32_t tscRebuildCreateDBStatement(void *param,char *result) { SCreateBuilder *builder = (SCreateBuilder *)param; int32_t code = TSDB_CODE_SUCCESS; - char *buf = calloc(1, TSDB_MAX_BINARY_LEN); + char *buf = calloc(1, TSDB_MAX_SQL_LEN); if (buf == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -582,7 +583,7 @@ int32_t tscRebuildCreateDBStatement(void *param,char *result) { } static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) { - char *buf = (char *)malloc(TSDB_MAX_BINARY_LEN); + char *buf = (char *)malloc(TSDB_MAX_SQL_LEN); if (buf == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -599,22 +600,23 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) { int32_t numOfTags = tscGetNumOfTags(pMeta); for (int32_t i = 0; i < numOfTags; i++) { if (i != numOfTags - 1) { - snprintf(buf + strlen(buf), TSDB_MAX_BINARY_LEN - strlen(buf), "`%s`,", pTagsSchema[i].name); + snprintf(buf + strlen(buf), TSDB_MAX_SQL_LEN - strlen(buf), "`%s`,", pTagsSchema[i].name); } else { - snprintf(buf + strlen(buf), TSDB_MAX_BINARY_LEN - strlen(buf), "`%s`", pTagsSchema[i].name); + snprintf(buf + strlen(buf), TSDB_MAX_SQL_LEN - strlen(buf), "`%s`", pTagsSchema[i].name); } } *result = buf; return TSDB_CODE_SUCCESS; } + static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, char *ddl) { SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pMeta = pTableMetaInfo->pTableMeta; - SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + SSqlObj *pInterSql = tscAllocSqlObj(); if (pInterSql == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -640,7 +642,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch param->fp = tscRebuildCreateTableStatement; param->callStage = SCREATE_CALLBACK_QUERY; - char *query = (char *)calloc(1, TSDB_MAX_BINARY_LEN); + char *query = (char *)calloc(1, TSDB_MAX_SQL_LEN); if (query == NULL) { free(param); free(pInterSql); @@ -656,7 +658,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch return code; } - snprintf(query + strlen(query), TSDB_MAX_BINARY_LEN - strlen(query), "SELECT %s FROM %s WHERE TBNAME IN(\'%s\')", columns, fullName, tblName); + snprintf(query + strlen(query), TSDB_MAX_SQL_LEN - strlen(query), "SELECT %s FROM %s WHERE TBNAME IN(\'%s\')", columns, fullName, tblName); doAsyncQuery(pSql->pTscObj, pInterSql, tscSCreateCallBack, param, query, strlen(query)); free(query); free(columns); @@ -681,9 +683,9 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, if (type == TSDB_DATA_TYPE_NCHAR) { bytes = bytes/TSDB_NCHAR_SIZE; } - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "`%s` %s(%d),", pSchema[i].name, tDataTypes[pSchema[i].type].name, bytes); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "`%s` %s(%d),", pSchema[i].name, tDataTypes[pSchema[i].type].name, bytes); } else { - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[pSchema[i].type].name); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[pSchema[i].type].name); } } sprintf(result + strlen(result) - 1, "%s", ")"); @@ -708,12 +710,12 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, if (type == TSDB_DATA_TYPE_NCHAR) { bytes = bytes/TSDB_NCHAR_SIZE; } - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result),"`%s` %s(%d),", pSchema[i].name,tDataTypes[pSchema[i].type].name, bytes); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result),"`%s` %s(%d),", pSchema[i].name,tDataTypes[pSchema[i].type].name, bytes); } else { - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[type].name); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[type].name); } } - snprintf(result + strlen(result) - 1, TSDB_MAX_BINARY_LEN - strlen(result), "%s %s", ")", "TAGS ("); + snprintf(result + strlen(result) - 1, TSDB_MAX_SQL_LEN - strlen(result), "%s %s", ")", "TAGS ("); for (int32_t i = numOfRows; i < totalRows; i++) { uint8_t type = pSchema[i].type; @@ -722,9 +724,9 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, if (type == TSDB_DATA_TYPE_NCHAR) { bytes = bytes/TSDB_NCHAR_SIZE; } - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "`%s` %s(%d),", pSchema[i].name,tDataTypes[pSchema[i].type].name, bytes); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "`%s` %s(%d),", pSchema[i].name,tDataTypes[pSchema[i].type].name, bytes); } else { - snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[type].name); + snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[type].name); } } sprintf(result + strlen(result) - 1, "%s", ")"); @@ -742,7 +744,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) { return TSDB_CODE_TSC_INVALID_VALUE; } - char *result = (char *)calloc(1, TSDB_MAX_BINARY_LEN); + char *result = (char *)calloc(1, TSDB_MAX_SQL_LEN); int32_t code = TSDB_CODE_SUCCESS; if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscRebuildDDLForSuperTable(pSql, tableName, result); @@ -766,7 +768,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + SSqlObj *pInterSql = tscAllocSqlObj(); if (pInterSql == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 559a488b8a6c2e0828c76688df07fa2093392284..c1bdbce91dacd413da79d2ac4ac4af12def2ab06 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -558,7 +558,7 @@ static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STab int32_t code = 0; STableMeta* tableMeta = NULL; - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + SSqlObj* pSql = tscAllocSqlObj(); if (pSql == NULL) { tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno)); code = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -2763,7 +2763,7 @@ static int32_t convertPrecisionType(int precision, SMLTimeStampType *tsType) { //make a dummy SSqlObj static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t code) { - SSqlObj *pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); + SSqlObj *pNew = tscAllocSqlObj(); if (pNew == NULL) { return NULL; } @@ -2771,7 +2771,6 @@ static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t cod pNew->pTscObj = taos; pNew->fp = NULL; - tsem_init(&pNew->rspSem, 0, 0); registerSqlObj(pNew); pNew->res.numOfRows = affected_rows; diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 8e4c417fbfe759bc383191b9d7a9b49f41978435..3f94cdc64a85df2db86db8bc75a7ce77c3c4403b 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1587,7 +1587,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { } pStmt->taos = pObj; - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + SSqlObj* pSql = tscAllocSqlObj(); if (pSql == NULL) { free(pStmt); @@ -1604,7 +1604,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { return NULL; } - tsem_init(&pSql->rspSem, 0, 0); pSql->signature = pSql; pSql->pTscObj = pObj; pSql->maxRetry = TSDB_MAX_REPLICA; diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 7de9ef762a29c47bf56eb63630be2e45cf269b39..3162b052806fc34208bdecb5afbe1bea1b9c469a 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -281,7 +281,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { // } else { // pQdesc->stableQuery = 0; // } - pthread_mutex_lock(&pSql->subState.mutex); + { pthread_mutex_lock(&pSql->subState.mutex); + if (pSql->pSubs != NULL && pSql->subState.states != NULL) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { // because subState maybe free on anytime by any thread, check validate from here @@ -298,7 +299,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { } } pQdesc->numOfSub = pSql->subState.numOfSub; - pthread_mutex_unlock(&pSql->subState.mutex); + + pthread_mutex_unlock(&pSql->subState.mutex); } } pQdesc->numOfSub = htonl(pQdesc->numOfSub); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 17c70125eebb2ef4c2f7a4c62d9ff2e022b5889c..8eb7f15fde6a878ff617e047826df91c9e2c1878 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -10081,7 +10081,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { size_t tableMetaCapacity = 0; SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); - pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); tableNameList = taosArrayInit(4, sizeof(SName)); size_t size = taosArrayGetSize(pInfo->list); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 2818d1014424c442454d4664859d3d90ab58ebdd..8cc28bb2ca8a8ce678dceab83c05dd1ff73d250b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -353,8 +353,8 @@ void checkBrokenQueries(STscObj *pTscObj) { pSql->lastAlive = taosGetTimestampMs(); } } else { - // lock subs - pthread_mutex_lock(&pSql->subState.mutex); + { pthread_mutex_lock(&pSql->subState.mutex); + if (pSql->pSubs) { // have sub sql for (int i = 0; i < pSql->subState.numOfSub; i++) { @@ -375,8 +375,8 @@ void checkBrokenQueries(STscObj *pTscObj) { } } } - // unlock - pthread_mutex_unlock(&pSql->subState.mutex); + + pthread_mutex_unlock(&pSql->subState.mutex); } } // kill query @@ -2564,7 +2564,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { } if (pParentCmd->pTableMetaMap == NULL) { - pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); } for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) { @@ -2810,7 +2810,7 @@ static void createHbObj(STscObj* pObj) { return; } - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + SSqlObj *pSql = tscAllocSqlObj(); if (NULL == pSql) return; pSql->fp = tscProcessHeartBeatRsp; @@ -3083,7 +3083,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { void tscTableMetaCallBack(void *param, TAOS_RES *res, int code); static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) { - SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); + SSqlObj *pNew = tscAllocSqlObj(); if (NULL == pNew) { tscError("0x%"PRIx64" malloc failed for new sqlobj to get table meta", pSql->self); return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -3149,7 +3149,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn } int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone) { - SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); + SSqlObj *pNew = tscAllocSqlObj(); if (NULL == pNew) { tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self); return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -3286,7 +3286,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create } int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) { - SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); + SSqlObj *pNew = tscAllocSqlObj(); if (NULL == pNew) { tscError("%p malloc failed for new sqlobj to get user-defined functions", pSql); return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -3391,15 +3391,16 @@ int tscRenewTableMeta(SSqlObj *pSql) { tscResetSqlCmd(pCmd, true, pSql->self); SSqlCmd* pCmd2 = &pSql->rootObj->cmd; - pCmd2->pTableMetaMap = tscCleanupTableMetaMap(pCmd2->pTableMetaMap); - pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + SHashObj *pmap = pCmd2->pTableMetaMap; + if (pmap == atomic_val_compare_exchange_ptr(&pCmd2->pTableMetaMap, pmap, NULL)) { + tscCleanupTableMetaMap(pCmd2->pTableMetaMap); + } + pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pSql->rootObj->retryReason = pSql->retryReason; SSqlObj *rootSql = pSql->rootObj; - pthread_mutex_lock(&rootSql->mtxSubs); tscFreeSubobj(rootSql); - pthread_mutex_unlock(&rootSql->mtxSubs); tscResetSqlCmd(&rootSql->cmd, true, rootSql->self); code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); @@ -3426,7 +3427,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) { if (allVgroupInfoRetrieved(pQueryInfo)) { return TSDB_CODE_SUCCESS; } - SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); + SSqlObj *pNew = tscAllocSqlObj(); pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 7280efe1fafe37aef368526376bc53bb0e2364fb..9acebc3a28d5ffab6404f4fa847312f13ccc1208 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -149,7 +149,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa pthread_mutex_init(&pObj->mutex, NULL); - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + SSqlObj *pSql = tscAllocSqlObj(); if (NULL == pSql) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tscReleaseRpc(pRpcObj); @@ -164,8 +164,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa pSql->param = param; pSql->cmd.command = TSDB_SQL_CONNECT; - tsem_init(&pSql->rspSem, 0, 0); - if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tscReleaseRpc(pRpcObj); @@ -305,10 +303,6 @@ void taos_close(TAOS *taos) { tscDebug("0x%"PRIx64" HB is freed", pHb->self); taosReleaseRef(tscObjRef, pHb->self); -#ifdef __APPLE__ - // to satisfy later tsem_destroy in taos_free_result - tsem_init(&pHb->rspSem, 0, 0); -#endif // __APPLE__ taos_free_result(pHb); } } @@ -363,14 +357,13 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, int64_t* nPrintTsc("%s", sqlstr); - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + SSqlObj* pSql = tscAllocSqlObj(); if (pSql == NULL) { tscError("failed to malloc sqlObj"); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } - tsem_init(&pSql->rspSem, 0, 0); doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); if (res != NULL) { @@ -754,7 +747,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) { pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; tscLockByThread(&pSql->squeryLock); - + + { pthread_mutex_lock(&pSql->subState.mutex); + for (int i = 0; i < pSql->subState.numOfSub; ++i) { // NOTE: pSub may have been released already here SSqlObj *pSub = pSql->pSubs[i]; @@ -774,6 +769,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) { // taosRelekaseRef(tscObjRef, pSubObj->self); } + pthread_mutex_unlock(&pSql->subState.mutex); } + if (pSql->subState.numOfSub <= 0) { tscAsyncResultOnError(pSql); } @@ -1047,7 +1044,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { return TSDB_CODE_TSC_DISCONNECTED; } - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + SSqlObj* pSql = tscAllocSqlObj(); pSql->pTscObj = taos; pSql->signature = pSql; @@ -1165,7 +1162,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + SSqlObj* pSql = tscAllocSqlObj(); tscAllocPayload(&pSql->cmd, 1024); pSql->pTscObj = taos; @@ -1182,7 +1179,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { return code; } - pSql->cmd.pTableMetaMap = taosHashInit(taosArrayGetSize(plist), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + pSql->cmd.pTableMetaMap = taosHashInit(taosArrayGetSize(plist), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); registerSqlObj(pSql); tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index d5cce9d756d0ec138f10cf4badd374c30fa0b364..ac6b7355d52543727413393f45ca09b581ceab7a 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -1002,7 +1002,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t return NULL; } - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + SSqlObj *pSql = tscAllocSqlObj(); if (pSql == NULL) { return NULL; } @@ -1054,7 +1054,6 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t pSql->fetchFp = tscCreateStream; pSql->cmd.resColumnId = TSDB_RES_COL_ID; - tsem_init(&pSql->rspSem, 0, 0); registerSqlObj(pSql); tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index b3d18f50114cd88e75e559323fed8c7d6f934289..d48a92cb962317dc1e6554a12a6aa474c6a1f193 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -83,7 +83,6 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { } } - static void asyncCallback(void *param, TAOS_RES *tres, int code) { assert(param != NULL); SSub *pSub = ((SSub *)param); @@ -117,7 +116,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* goto fail; } - pSql = calloc(1, sizeof(SSqlObj)); + pSql = tscAllocSqlObj(); if (pSql == NULL) { line = __LINE__; code = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -132,11 +131,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - if (tsem_init(&pSql->rspSem, 0, 0) == -1) { - line = __LINE__; - code = TAOS_SYSTEM_ERROR(errno); - goto fail; - } pSql->param = pSub; pSql->maxRetry = TSDB_MAX_REPLICA; @@ -432,7 +426,7 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char } SSqlObj* recreateSqlObj(SSub* pSub) { - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + SSqlObj* pSql = tscAllocSqlObj(); if (pSql == NULL) { return NULL; } @@ -442,10 +436,6 @@ SSqlObj* recreateSqlObj(SSub* pSub) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - if (tsem_init(&pSql->rspSem, 0, 0) == -1) { - tscFreeSqlObj(pSql); - return NULL; - } pSql->param = pSub; pSql->maxRetry = TSDB_MAX_REPLICA; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index a8c2eea195992c4a6e66cbfd49bf9e36f2cf4c8c..d0a403f6435b378882e8d5198ec640b90ceede5c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -66,16 +66,14 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { } } -static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { +static void subquerySetStateWithoutLock(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { assert(idx < subState->numOfSub && subState->states != NULL); tscDebug("subquery:0x%"PRIx64",%d state set to %d", pSql->self, idx, state); - pthread_mutex_lock(&subState->mutex); subState->states[idx] = state; - pthread_mutex_unlock(&subState->mutex); } -static bool allSubqueryDone(SSqlObj *pParentSql) { +static bool allSubqueryDoneWithoutLock(SSqlObj *pParentSql) { bool done = true; SSubqueryState *subState = &pParentSql->subState; @@ -107,7 +105,7 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx); subState->states[idx] = 1; } - bool done = allSubqueryDone(pParentSql); + bool done = allSubqueryDoneWithoutLock(pParentSql); if (!done) { tscDebug("0x%"PRIx64" sub:%p,%d completed, total:%d", pParentSql->self, pSql, idx, pParentSql->subState.numOfSub); } @@ -115,8 +113,6 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { return done; } - - static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); @@ -125,7 +121,6 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { SLimitVal* pLimit = &pQueryInfo->limit; int32_t order = pQueryInfo->order.order; - int32_t joinNum = pSql->subState.numOfSub; SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}}; SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0}; int32_t slot = 0; @@ -140,25 +135,39 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { STSElem prev; SArray* tsCond = NULL; int32_t mergeDone = 0; + int32_t joinNum = 0; + + int errflag = 0; + { pthread_mutex_lock(&pSql->subState.mutex); + + joinNum = pSql->subState.numOfSub; for (int32_t i = 0; i < joinNum; ++i) { + SSqlObj *pSub = pSql->pSubs[i]; + if (!pSub) { + tscError("0x%"PRIx64" the %d'th of the (%d) sub queries is null.", pSql->self, i, joinNum); + errflag = 1; + break; + } STSBuf* output = tsBufCreate(true, pQueryInfo->order.order); - SQueryInfo* pSubQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd); + SQueryInfo* pSubQueryInfo = tscGetQueryInfo(&pSub->cmd); pSubQueryInfo->tsBuf = output; - SJoinSupporter* pSupporter = pSql->pSubs[i]->param; + SJoinSupporter* pSupporter = pSub->param; if (pSupporter->pTSBuf == NULL) { tscDebug("0x%"PRIx64" at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql->self); - return 0; + errflag = 1; + break; } tsBufResetPos(pSupporter->pTSBuf); if (!tsBufNextPos(pSupporter->pTSBuf)) { tscDebug("0x%"PRIx64" input1 is empty, 0 for secondary query after ts blocks intersecting", pSql->self); - return 0; + errflag = 1; + break; } tscDebug("0x%"PRIx64" sub:0x%"PRIx64" table idx:%d, input group number:%d", pSql->self, @@ -168,6 +177,11 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { ctxlist[i].res = output; } + pthread_mutex_unlock(&pSql->subState.mutex); } + if (errflag) { + return 0; + } + TSKEY st = taosGetTimestampUs(); for (int16_t tidx = 0; tidx < joinNum; tidx++) { @@ -523,7 +537,11 @@ static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupT static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { int32_t numOfSub = 0; SJoinSupporter* pSupporter = NULL; - + bool success = true; + uint32_t stateVersion = 0; + + { pthread_mutex_lock(&pSql->subState.mutex); + //If the columns are not involved in the final select clause, the corresponding query will not be issued. for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { pSupporter = pSql->pSubs[i]->param; @@ -531,20 +549,19 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ++numOfSub; } } - + assert(numOfSub > 0); - + // scan all subquery, if one sub query has only ts, ignore it tscDebug("0x%"PRIx64" start to launch secondary subqueries, %d out of %d needs to query", pSql->self, numOfSub, pSql->subState.numOfSub); - bool success = true; - for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj *pPrevSub = pSql->pSubs[i]; + if (!pPrevSub) continue; pSql->pSubs[i] = NULL; - + pSupporter = pPrevSub->param; - + if (taosArrayGetSize(pSupporter->exprList) == 0) { tscDebug("0x%"PRIx64" subIndex: %d, no need to launch query, ignore it", pSql->self, i); @@ -572,7 +589,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { tscClearSubqueryInfo(&pNew->cmd); pSql->pSubs[i] = pNew; - + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd); pQueryInfo->tsBuf = pTsBuf; // transfer the ownership of timestamp comp-z data to the new created object @@ -664,42 +681,54 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { pQueryInfo->stableQuery = true; } - subquerySetState(pNew, &pSql->subState, i, 0); - + subquerySetStateWithoutLock(pNew, &pSql->subState, i, 0); + size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); tscDebug("0x%"PRIx64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); } - + + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + + pthread_mutex_unlock(&pSql->subState.mutex); } + //prepare the subqueries object failed, abort if (!success) { pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; tscError("0x%"PRIx64" failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql->self, pSql->subState.numOfSub, pSql->res.code); - freeJoinSubqueryObj(pSql); - - return pSql->res.code; + goto _error; } - + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - if (pSql->pSubs[i] == NULL) { - continue; + SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref + if (pSub == NULL) { + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + pSql->res.code = TSDB_CODE_FAILED; + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + goto _error; } - - SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd); - executeQuery(pSql->pSubs[i], pQueryInfo); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd); + executeQuery(pSub, pQueryInfo); + tscReleaseRefOfSubobj(pSub); // REL ref } return TSDB_CODE_SUCCESS; + +_error: + freeJoinSubqueryObj(pSql); + return pSql->res.code; } void freeJoinSubqueryObj(SSqlObj* pSql) { - if (pSql->subState.numOfSub == 0) { - return; - } - pthread_mutex_lock(&pSql->subState.mutex); + if (pSql->subState.numOfSub == 0) { + goto _out; + } + pSql->subState.version ++; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; @@ -713,13 +742,12 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { taos_free_result(pSub); pSql->pSubs[i] = NULL; } - + tfree(pSql->pSubs); tfree(pSql->subState.states); pSql->subState.numOfSub = 0; +_out: pthread_mutex_unlock(&pSql->subState.mutex); - - pthread_mutex_destroy(&pSql->subState.mutex); } static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { @@ -1031,18 +1059,24 @@ static int32_t tidTagsMergeSort(SArray *arr, int32_t start, int32_t end, const i } static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) { - int16_t joinNum = pParentSql->subState.numOfSub; STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); - SJoinSupporter* p0 = pParentSql->pSubs[0]->param; SMergeCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}}; SMergeCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0}; + SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); + int16_t joinNum = 0; + SJoinSupporter* p0 = NULL; + int32_t size = 0; + int32_t code = TSDB_CODE_SUCCESS; + + { pthread_mutex_lock(&pParentSql->subState.mutex); + + joinNum = pParentSql->subState.numOfSub; + p0 = pParentSql->pSubs[0]->param; // int16_t for padding - int32_t size = p0->tagSize - sizeof(int16_t); + size = p0->tagSize - sizeof(int16_t); - SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); - tscDebug("0x%"PRIx64" all subquery retrieve complete, do tags match", pParentSql->self); for (int32_t i = 0; i < joinNum; i++) { @@ -1064,10 +1098,16 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar for (int32_t j = 0; j <= i; j++) { taosArrayDestroy(&ctxlist[j].res); } - return TSDB_CODE_QRY_DUP_JOIN_KEY; + code = TSDB_CODE_QRY_DUP_JOIN_KEY; + break; } } + pthread_mutex_unlock(&pParentSql->subState.mutex); } + if (code != TSDB_CODE_SUCCESS) { + return code; + } + int32_t slot = 0; size_t tableNum = 0; int16_t* tableMIdx = 0; @@ -1384,22 +1424,34 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow (*pParentSql->fp)(pParentSql->param, pParentSql, 0); } else { + uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pParentSql); + for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { + SSqlObj *psub = tscAcquireRefOfSubobj(pParentSql, m, stateVersion); // ACQ ref + if (!psub) { + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pParentSql)) { + continue; + } + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + break; + } + // proceed to for ts_comp query - SSqlCmd* pSubCmd = &pParentSql->pSubs[m]->cmd; + SSqlCmd* pSubCmd = &psub->cmd; SArray** s = taosArrayGet(resList, m); SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pSubCmd); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo, *s); - SSqlObj* psub = pParentSql->pSubs[m]; ((SJoinSupporter*)psub->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables); - memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub); + tscResetAllSubStates(pParentSql); tscDebug("0x%"PRIx64" reset all sub states to 0", pParentSql->self); - + issueTsCompQuery(psub, psub->param, pParentSql); + + tscReleaseRefOfSubobj(psub); // REL ref } } @@ -1695,12 +1747,14 @@ _return: } void tscFetchDatablockForSubquery(SSqlObj* pSql) { - assert(pSql->subState.numOfSub >= 1); - int32_t numOfFetch = 0; bool hasData = true; bool reachLimit = false; + { pthread_mutex_lock(&pSql->subState.mutex); + + assert(pSql->subState.numOfSub >= 1); + // if the subquery is NULL, it does not involved in the final result generation for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; @@ -1731,6 +1785,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { } } + pthread_mutex_unlock(&pSql->subState.mutex); } + // has data remains in client side, and continue to return data to app if (hasData) { tscBuildResFromSubqueries(pSql); @@ -1754,8 +1810,11 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { if (numOfFetch <= 0) { bool tryNextVnode = false; - bool orderedPrjQuery = false; + uint32_t stateVersion = 0; + + { pthread_mutex_lock(&pSql->subState.mutex); + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -1769,21 +1828,28 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { } } - if (orderedPrjQuery) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { - subquerySetState(pSub, &pSql->subState, i, 0); + subquerySetStateWithoutLock(pSub, &pSql->subState, i, 0); } } } - + + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + + pthread_mutex_unlock(&pSql->subState.mutex); } for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref if (pSub == NULL) { - continue; + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + pSql->res.code = TSDB_CODE_FAILED; + break; } SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd); @@ -1813,6 +1879,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { tscDebug("0x%"PRIx64" no result in current subquery anymore", pSub->self); } } + + tscReleaseRefOfSubobj(pSub); // REL ref } if (tryNextVnode) { @@ -1836,24 +1904,34 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { tscDebug("0x%"PRIx64" retrieve data from %d subqueries", pSql->self, numOfFetch); SJoinSupporter* pSupporter = NULL; + uint32_t stateVersion = 0; + + { pthread_mutex_lock(&pSql->subState.mutex); + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; if (pSql1 == NULL) { continue; } - - SSqlRes* pRes1 = &pSql1->res; if (pRes1->row >= pRes1->numOfRows && !pRes1->completed) { - subquerySetState(pSql1, &pSql->subState, i, 0); + subquerySetStateWithoutLock(pSql1, &pSql->subState, i, 0); } } + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + + pthread_mutex_unlock(&pSql->subState.mutex); } + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSql1 = pSql->pSubs[i]; + SSqlObj* pSql1 = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref if (pSql1 == NULL) { - continue; + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + break; } SSqlRes* pRes1 = &pSql1->res; @@ -1880,6 +1958,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { tscBuildAndSendRequest(pSql1, NULL); } + tscReleaseRefOfSubobj(pSql1); // REF ref } } @@ -2041,39 +2120,32 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { SSqlCmd * pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); - + pSql->res.qId = 0x1; assert(pSql->res.numOfRows == 0); - if (pSql->pSubs == NULL) { - pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); - if (pSql->pSubs == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - } - SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL); if (pNew == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - + pSql->pSubs[tableIndex] = pNew; - + if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); - + // refactor as one method SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd); assert(pNewQueryInfo != NULL); pSupporter->colList = pNewQueryInfo->colList; pNewQueryInfo->colList = NULL; - + pSupporter->exprList = pNewQueryInfo->exprList; pNewQueryInfo->exprList = NULL; - + pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo; - + // this data needs to be transfer to support struct memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo)); if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) { @@ -2186,33 +2258,30 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { int32_t code = TSDB_CODE_SUCCESS; - if (pSql->subState.states == NULL) { - pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); - if (pSql->subState.states == NULL) { - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; - } - - pthread_mutex_init(&pSql->subState.mutex, NULL); + code = doReInitSubState(pSql, pQueryInfo->numOfTables); + if (code != TSDB_CODE_SUCCESS) { + goto _error; } - pSql->subState.numOfSub = pQueryInfo->numOfTables; + uint32_t stateVersion = 0; + int errflag = 0; - memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); - tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables); + { pthread_mutex_lock(&pSql->subState.mutex); - for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i); if (pSupporter == NULL) { // failed to create support struct, abort current query tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i); code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; + errflag = 1; + break; } - + code = tscCreateJoinSubquery(pSql, i, pSupporter); if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query tscDestroyJoinSupporter(pSupporter); - goto _error; + errflag = 1; + break; } SSqlObj* pSub = pSql->pSubs[i]; @@ -2223,23 +2292,38 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { } } + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + + pthread_mutex_unlock(&pSql->subState.mutex); } + if (errflag) { + goto _error; + } + if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { // at least one subquery is empty, do nothing and return freeJoinSubqueryObj(pSql); (*pSql->fp)(pSql->param, pSql, 0); } else { int fail = 0; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref + if (!pSub) { + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + code = TSDB_CODE_FAILED; + goto _error; + } if (fail) { (*pSub->fp)(pSub->param, pSub, 0); continue; } - + if ((code = tscBuildAndSendRequest(pSub, NULL)) != TSDB_CODE_SUCCESS) { pRes->code = code; (*pSub->fp)(pSub->param, pSub, 0); fail = 1; } + tscReleaseRefOfSubobj(pSub); // REL ref } if(fail) { @@ -2251,22 +2335,17 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { return; - _error: +_error: pRes->code = code; tscAsyncResultOnError(pSql); } -void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { +void doCleanupSubqueries(SSqlObj *pSql) { pthread_mutex_lock(&pSql->subState.mutex); - if (numOfSubs > pSql->subState.numOfSub || numOfSubs <= 0 || pSql->subState.numOfSub <= 0) { - pthread_mutex_unlock(&pSql->subState.mutex); - return; - } - - - for(int32_t i = 0; i < numOfSubs; ++i) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; - assert(pSub != NULL); + if (!pSub) continue; + pSql->pSubs[i] = NULL; tscFreeRetrieveSup(&pSub->param); @@ -2377,10 +2456,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { if (code != TSDB_CODE_SUCCESS) { tscFreeFirstRoundSup(¶m); taos_free_result(pSql); - pthread_mutex_lock(&pParent->subState.mutex); - pParent->subState.numOfSub = 0; - tfree(pParent->pSubs); - pthread_mutex_unlock(&pParent->subState.mutex); + tscFreeSubobj(pParent); pParent->res.code = code; tscAsyncResultOnError(pParent); return; @@ -2483,11 +2559,8 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { tscFreeFirstRoundSup(¶m); taos_free_result(pSql); - pthread_mutex_lock(&pParent->subState.mutex); - pParent->subState.numOfSub = 0; - tfree(pParent->pSubs); - pthread_mutex_unlock(&pParent->subState.mutex); - + tscFreeSubobj(pParent); + if (resRows == 0) { pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; (*pParent->fp)(pParent->param, pParent, 0); @@ -2509,10 +2582,7 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { tscFreeFirstRoundSup(¶m); taos_free_result(pSql); - pthread_mutex_lock(&parent->subState.mutex); - parent->subState.numOfSub = 0; - tfree(parent->pSubs); - pthread_mutex_unlock(&parent->subState.mutex); + tscFreeSubobj(parent); parent->res.code = c; tscAsyncResultOnError(parent); return; @@ -2677,14 +2747,11 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo), idx+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); - pSql->pSubs = calloc(1, POINTER_BYTES); - if (pSql->pSubs == NULL) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + int32_t code = doReInitSubState(pSql, 1); + if (code != TSDB_CODE_SUCCESS) { goto _error; } - pSql->subState.numOfSub = 1; - pSql->pSubs[0] = pNew; tscHandleMasterSTableQuery(pNew); @@ -2706,17 +2773,19 @@ typedef struct SPair { static void doSendQueryReqs(SSchedMsg* pSchedMsg) { SSqlObj* pSql = pSchedMsg->ahandle; SPair* p = pSchedMsg->msg; + uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); for (int32_t i = p->first; i < p->second; ++i) { - if (i >= pSql->subState.numOfSub) { - tfree(p); - return; + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref + if (!pSub) { + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + break; } - SSqlObj* pSub = pSql->pSubs[i]; SRetrieveSupport* pSupport = pSub->param; tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); tscBuildAndSendRequest(pSub, NULL); + tscReleaseRefOfSubobj(pSub); // REL ref } tfree(p); @@ -2779,12 +2848,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - SSubqueryState *pState = &pSql->subState; - int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups : (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); - - int32_t ret = doInitSubState(pSql, numOfSub); + int32_t ret = doReInitSubState(pSql, numOfSub); if (ret != 0) { tscAsyncResultOnError(pSql); return ret; @@ -2799,11 +2865,11 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return ret; } - tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub); + tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pSql->subState.numOfSub); pRes->code = TSDB_CODE_SUCCESS; int32_t i = 0; - for (; i < pState->numOfSub; ++i) { + for (; i < pSql->subState.numOfSub; ++i) { SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); if (trs == NULL) { tscError("0x%"PRIx64" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno)); @@ -2843,24 +2909,19 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self, trs->subqueryIndex); } - - if (i < pState->numOfSub) { + + if (i < pSql->subState.numOfSub) { tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - - tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub); - doCleanupSubqueries(pSql, i); - return pRes->code; // free all allocated resource } - - if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub); - doCleanupSubqueries(pSql, i); + + if (pRes->code != TSDB_CODE_SUCCESS) { + tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pSql->subState.numOfSub); + doCleanupSubqueries(pSql); return pRes->code; } doConcurrentlySendSubQueries(pSql); - return TSDB_CODE_SUCCESS; } @@ -3303,7 +3364,7 @@ SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSq pSql->pSubs[trsupport->subqueryIndex] = pNew; } - + return pNew; } @@ -3375,13 +3436,16 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { } } -static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { +static bool needRetryInsert(SSqlObj* pParentObj) { if (pParentObj->retry > pParentObj->maxRetry) { tscError("0x%"PRIx64" max retry reached, abort the retry effort", pParentObj->self); return false; } + bool ret = true; + + { pthread_mutex_lock(&pParentObj->subState.mutex); - for (int32_t i = 0; i < numOfSub; ++i) { + for (int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) { int32_t code = pParentObj->pSubs[i]->res.code; if (code == TSDB_CODE_SUCCESS) { continue; @@ -3391,22 +3455,23 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_APP_NOT_READY) { pParentObj->res.code = code; - return false; + ret = false; + break; } } - return true; + pthread_mutex_unlock(&pParentObj->subState.mutex); } + + return ret; } static void doFreeInsertSupporter(SSqlObj* pSqlObj) { - if (pSqlObj == NULL || pSqlObj->subState.numOfSub <= 0) { - return; - } - + pthread_mutex_lock(&pSqlObj->subState.mutex); for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) { SSqlObj* pSql = pSqlObj->pSubs[i]; tfree(pSql->param); } + pthread_mutex_unlock(&pSqlObj->subState.mutex); } static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { @@ -3442,7 +3507,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) // restore user defined fp pParentObj->fp = pParentObj->fetchFp; - int32_t numOfSub = pParentObj->subState.numOfSub; doFreeInsertSupporter(pParentObj); if (pParentObj->res.code == TSDB_CODE_SUCCESS) { @@ -3453,14 +3517,18 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; (*pParentObj->fp)(pParentObj->param, pParentObj, v); } else { - if (!needRetryInsert(pParentObj, numOfSub)) { + if (!needRetryInsert(pParentObj)) { tscAsyncResultOnError(pParentObj); return; } int32_t numOfFailed = 0; - for(int32_t i = 0; i < numOfSub; ++i) { + + { pthread_mutex_lock(&pParentObj->subState.mutex); + + for(int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) { SSqlObj* pSql = pParentObj->pSubs[i]; + if (!pSql) continue; if (pSql->res.code != TSDB_CODE_SUCCESS) { numOfFailed += 1; @@ -3470,14 +3538,16 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0); tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); - subquerySetState(pSql, &pParentObj->subState, i, 0); + subquerySetStateWithoutLock(pSql, &pParentObj->subState, i, 0); tscDebug("0x%"PRIx64", failed sub:%d, %p", pParentObj->self, i, pSql); } } + pthread_mutex_unlock(&pParentObj->subState.mutex); } + tscWarn("0x%"PRIx64" Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj->self, - pParentObj->res.numOfRows, numOfFailed, numOfSub); + pParentObj->res.numOfRows, numOfFailed, pParentObj->subState.numOfSub); tscDebug("0x%"PRIx64" cleanup %d tableMeta in hashTable before reparse sql", pParentObj->self, pParentObj->cmd.insertParam.numOfTables); for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) { @@ -3544,8 +3614,15 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return pRes->code; } + uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref + if (!pSub) { + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + pRes->code = TSDB_CODE_FAILED; + return pRes->code; + } SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter)); pSup->idx = i; pSup->pSql = pSql; @@ -3555,43 +3632,33 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { if (pSub->res.code != TSDB_CODE_SUCCESS) { tscHandleInsertRetry(pSql, pSub); } + tscReleaseRefOfSubobj(pSub); // REL ref } return TSDB_CODE_SUCCESS; } - pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->insertParam.pDataBlocks); - assert(pSql->subState.numOfSub > 0); - pRes->code = TSDB_CODE_SUCCESS; - // the number of already initialized subqueries - int32_t numOfSub = 0; - - if (pSql->subState.states == NULL) { - pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); - if (pSql->subState.states == NULL) { - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; - } - - pthread_mutex_init(&pSql->subState.mutex, NULL); + int32_t code = doReInitSubState(pSql, (int32_t)taosArrayGetSize(pCmd->insertParam.pDataBlocks)); + if (code != TSDB_CODE_SUCCESS) { + goto _error; } - memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); - tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self); + uint32_t stateVersion = 0; + int32_t numOfSub = 0; + int errflag = 0; - pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); - if (pSql->pSubs == NULL) { - goto _error; - } + { pthread_mutex_lock(&pSql->subState.mutex); + assert(pSql->subState.numOfSub > 0); tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub); while(numOfSub < pSql->subState.numOfSub) { SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); if (pSupporter == NULL) { - goto _error; + errflag = 1; + break; } pSupporter->pSql = pSql; @@ -3600,7 +3667,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT); if (pNew == NULL) { tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno)); - goto _error; + errflag = 1; + break; } /* @@ -3608,6 +3676,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { * the callback function (multiVnodeInsertFinalize) correctly. */ pNew->fetchFp = pNew->fp; + pSql->pSubs[numOfSub] = pNew; STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub); @@ -3618,28 +3687,44 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { } else { tscDebug("0x%"PRIx64" prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql->self, numOfSub, pSql->subState.numOfSub, tstrerror(pRes->code)); - goto _error; + errflag = 1; + break; } } - + if (numOfSub < pSql->subState.numOfSub) { tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; + errflag = 1; + } + + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + + pthread_mutex_unlock(&pSql->subState.mutex); } + if (errflag) { + goto _error; } pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); // use the local variable - for (int32_t j = 0; j < numOfSub; ++j) { - SSqlObj *pSub = pSql->pSubs[j]; + for (int32_t j = 0; j < pSql->subState.numOfSub; ++j) { + SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, j, stateVersion); // ACQ ref + if (!pSub) { + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + return TSDB_CODE_FAILED; + } tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j); tscBuildAndSendRequest(pSub, NULL); + tscReleaseRefOfSubobj(pSub); // REL ref } return TSDB_CODE_SUCCESS; - _error: +_error: return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -3663,6 +3748,9 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); int32_t numOfRes = INT32_MAX; + + { pthread_mutex_lock(&pSql->subState.mutex); + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -3673,6 +3761,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { numOfRes = (int32_t)(MIN(numOfRes, remain)); } + pthread_mutex_unlock(&pSql->subState.mutex); } + if (numOfRes == 0) { // no result any more, free all subquery objects pSql->res.completed = true; freeJoinSubqueryObj(pSql); @@ -3700,6 +3790,9 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { tscRestoreFuncForSTableQuery(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo); + + { pthread_mutex_lock(&pSql->subState.mutex); + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -3735,6 +3828,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { assert(pSub->res.row <= pSub->res.numOfRows); } + pthread_mutex_unlock(&pSql->subState.mutex); } + pRes->numOfRows = numOfRes; pRes->numOfClauseTotal += numOfRes; @@ -3847,6 +3942,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); + { pthread_mutex_lock(&pSql->subState.mutex); + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { bool allSubqueryExhausted = true; @@ -3893,6 +3990,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { } } + pthread_mutex_unlock(&pSql->subState.mutex); } + return hasData; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b0f9e67349b7b693bee6048c4e1797fcd10d10c9..19088d161e3fef1763c3c6363f8cac5158800648 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1647,7 +1647,10 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) { pCmd->insertParam.tagData.dataLen = 0; tscFreeQueryInfo(pCmd, clearCachedMeta, id); - pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); + SHashObj *pmap = pCmd->pTableMetaMap; + if (pmap == atomic_val_compare_exchange_ptr(&pCmd->pTableMetaMap, pmap, NULL)) { + tscCleanupTableMetaMap(pCmd->pTableMetaMap); + } taosReleaseRef(tscObjRef, id); } @@ -1678,33 +1681,30 @@ void tscFreeSqlResult(SSqlObj* pSql) { } void tscFreeSubobj(SSqlObj* pSql) { + pthread_mutex_lock(&pSql->subState.mutex); if (pSql->subState.numOfSub == 0) { - return; + goto _out; } - - pthread_mutex_lock(&pSql->subState.mutex); + pSql->subState.version ++; tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub); for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - if (pSql->pSubs[i] != NULL) { - tscDebug("0x%"PRIx64" free sub SqlObj:0x%"PRIx64", index:%d", pSql->self, pSql->pSubs[i]->self, i); - } else { - /* just for python error test case */ + if (!pSql->pSubs[i]) { tscDebug("0x%"PRIx64" free sub SqlObj:0x0, index:%d", pSql->self, i); + continue; } + tscDebug("0x%"PRIx64" free sub SqlObj:0x%"PRIx64", index:%d", pSql->self, pSql->pSubs[i]->self, i); taos_free_result(pSql->pSubs[i]); pSql->pSubs[i] = NULL; } + tfree(pSql->pSubs); tfree(pSql->subState.states); pSql->subState.numOfSub = 0; - tfree(pSql->pSubs); - +_out: pthread_mutex_unlock(&pSql->subState.mutex); - - pthread_mutex_destroy(&pSql->subState.mutex); } /** @@ -1741,6 +1741,52 @@ void tscFreeMetaSqlObj(int64_t *rid){ } } +SSqlObj* tscAllocSqlObj() { + SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); + if (!pNew) { + return NULL; + } + int rc = tsem_init(&pNew->rspSem, 0, 0); + assert(rc == 0 && "tsem_init failure"); + rc = pthread_mutex_init(&pNew->subState.mutex, NULL); + assert(rc == 0 && "pthread_mutex_init failure"); + return pNew; +} + +SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx, uint32_t stateVersion) { + assert (pSql != NULL); + SSqlObj *pSub = NULL; + + { pthread_mutex_lock(&pSql->subState.mutex); + if (stateVersion != tscGetVersionOfSubStateWithoutLock(pSql) || + idx < 0 || + idx >= pSql->subState.numOfSub || + !pSql->pSubs[idx]) { + goto _out; + } + pSub = taosAcquireRef(tscObjRef, pSql->pSubs[idx]->self); + assert (pSql->pSubs[idx] == pSub && "Refcounted subquery obj mismatch"); + +_out: + pthread_mutex_unlock(&pSql->subState.mutex); } + return pSub; +} + +void tscReleaseRefOfSubobj(SSqlObj* pSub) { + assert (pSub != NULL && pSub->self != 0 && "Subquery obj not refcounted"); + taosReleaseRef(tscObjRef, pSub->self); +} + +uint32_t tscGetVersionOfSubStateWithoutLock(SSqlObj *pSql) { + return pSql->subState.version; +} + +void tscResetAllSubStates(SSqlObj* pSql) { + pthread_mutex_lock(&pSql->subState.mutex); + memset(pSql->subState.states, 0, sizeof(pSql->subState.states[0]) * pSql->subState.numOfSub); + pthread_mutex_unlock(&pSql->subState.mutex); +} + void tscFreeSqlObj(SSqlObj* pSql) { if (pSql == NULL || pSql->signature != pSql) { return; @@ -1764,14 +1810,11 @@ void tscFreeSqlObj(SSqlObj* pSql) { tscFreeSubobj(pSql); - if (pSql && (pSql == pSql->rootObj)) { - pthread_mutex_destroy(&pSql->mtxSubs); - } - pSql->signature = NULL; pSql->fp = NULL; tfree(pSql->sqlstr); tfree(pSql->pBuf); + pSql->self = 0; tscFreeSqlResult(pSql); @@ -1782,6 +1825,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { tscDebug("0x%"PRIx64" addr:%p free completed", sid, pSql); + pthread_mutex_destroy(&pSql->subState.mutex); tsem_destroy(&pSql->rspSem); memset(pSql, 0, sizeof(*pSql)); free(pSql); @@ -3889,7 +3933,7 @@ void registerSqlObj(SSqlObj* pSql) { } SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, int32_t cmd) { - SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); + SSqlObj* pNew = tscAllocSqlObj(); if (pNew == NULL) { tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, 0); return NULL; @@ -3901,7 +3945,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in SSqlCmd* pCmd = &pNew->cmd; pCmd->command = cmd; - tsem_init(&pNew->rspSem, 0 ,0); if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) { tscFreeSqlObj(pNew); @@ -3966,7 +4009,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pNewQueryInfo, int64_t ui SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql) { SSqlCmd* pCmd = &pSql->cmd; - SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); + SSqlObj* pNew = tscAllocSqlObj(); if (pNew == NULL) { tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, tableIndex); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -3980,7 +4023,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pNew->signature = pNew; pNew->sqlstr = strdup(pSql->sqlstr); pNew->rootObj = pSql->rootObj; - tsem_init(&pNew->rspSem, 0, 0); SSqlCmd* pnCmd = &pNew->cmd; memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); @@ -4305,26 +4347,25 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { return; } - taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param); } -int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { - //bug fix. Above doInitSubState level, the loop invocation with the same SSqlObj will be fail. - //assert(pSql->subState.numOfSub == 0 && pSql->pSubs == NULL && pSql->subState.states == NULL); +int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { tscFreeSubobj(pSql); - - pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES); - pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); + int32_t code = TSDB_CODE_SUCCESS; - int32_t code = pthread_mutex_init(&pSql->subState.mutex, NULL); - if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != 0) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } + { pthread_mutex_lock(&pSql->subState.mutex); + pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES); + pSql->subState.states = calloc(numOfSubqueries, sizeof(int8_t)); + if (pSql->pSubs == NULL || pSql->subState.states == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + } pSql->subState.numOfSub = numOfSubqueries; + pSql->subState.version ++; - return TSDB_CODE_SUCCESS; + pthread_mutex_unlock(&pSql->subState.mutex); } + return code; } // do execute the query according to the query execution plan @@ -4349,21 +4390,26 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { // upstream may be freed before retry if (pQueryInfo->pUpstream && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly - code = doInitSubState(pSql, (int32_t) taosArrayGetSize(pQueryInfo->pUpstream)); + code = doReInitSubState(pSql, (int32_t) taosArrayGetSize(pQueryInfo->pUpstream)); if (code != TSDB_CODE_SUCCESS) { goto _error; } + uint32_t stateVersion = 0; + int errflag = 0; + { pthread_mutex_lock(&pSql->subState.mutex); + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i); pSql->cmd.active = pSub; pSql->cmd.command = TSDB_SQL_SELECT; - SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); + SSqlObj* pNew = tscAllocSqlObj(); if (pNew == NULL) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; + errflag = 1; + break; } pNew->pTscObj = pSql->pTscObj; @@ -4376,24 +4422,26 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { pNew->cmd.resColumnId = TSDB_RES_COL_ID; - tsem_init(&pNew->rspSem, 0, 0); - SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id if (ps == NULL) { tscFreeSqlObj(pNew); - goto _error; + errflag = 1; + break; } ps->pParentSql = pSql; ps->subqueryIndex = i; - pNew->param = ps; + + registerSqlObj(pNew); + pSql->pSubs[i] = pNew; SSqlCmd* pCmd = &pNew->cmd; pCmd->command = TSDB_SQL_SELECT; if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) { - goto _error; + errflag = 1; + break; } SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd); @@ -4403,9 +4451,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { numOfInit++; } + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + + pthread_mutex_unlock(&pSql->subState.mutex); } + if (errflag) { + goto _error; + } + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* psub = pSql->pSubs[i]; - registerSqlObj(psub); + SSqlObj* psub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref + if (!psub) { + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + code = TSDB_CODE_FAILED; + goto _error; + } // create sub query to handle the sub query. SQueryInfo* pq = tscGetQueryInfo(&psub->cmd); @@ -4415,30 +4477,21 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { psub->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; } executeQuery(psub, pq); + + tscReleaseRefOfSubobj(psub); // REL ref } return; - } else if (hasMoreClauseToTry(pSql)) { - if (pthread_mutex_init(&pSql->subState.mutex, NULL) != 0) { - goto _error; - } } pSql->cmd.active = pQueryInfo; doExecuteQuery(pSql, pQueryInfo); return; - _error: - - for(int32_t i = 0; i < numOfInit; ++i) { - SSqlObj* p = pSql->pSubs[i]; - tscFreeSqlObj(p); - } - +_error: pSql->res.code = code; - pSql->subState.numOfSub = 0; // not initialized sub query object will not be freed - tfree(pSql->subState.states); - tfree(pSql->pSubs); + tscFreeSubobj(pSql); + tscAsyncResultOnError(pSql); } @@ -4682,7 +4735,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { * * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. */ - pSql->subState.numOfSub = 0; + tscFreeSubobj(pSql); pCmd->command = TSDB_SQL_SELECT; tscResetForNextRetrieve(pRes); diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index cd030df1b4f5bb29b966dc6930037b127d6b768c..a8f536fb1335f04deb183f55138b17f3db1b5019 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -67,6 +67,8 @@ extern int32_t tsCompressColData; extern int32_t tsMaxNumOfDistinctResults; extern char tsTempDir[]; extern int32_t tsShortcutFlag; +extern int32_t tsMaxSqlGroups; +extern int8_t tsSortWhenGroupBy; // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 1c997eb97103ca0a6c2475056f977bf319450a3b..319503ee7057a5ae2bac336fbdc0b07e4a934a75 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -118,6 +118,12 @@ int32_t tsRetryStreamCompDelay = 30 * 60 * 1000; // The delayed computing ration. 10% of the whole computing time window by default. float tsStreamComputDelayRatio = 0.1f; +// max supported groups for group by clause / interval clause +int32_t tsMaxSqlGroups = 1000000; + +// order by first group by column when group by +int8_t tsSortWhenGroupBy = 1; + int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance @@ -1782,6 +1788,26 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "maxSqlGroups"; + cfg.ptr = &tsMaxSqlGroups; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; + cfg.minValue = 500000; + cfg.maxValue = 10000000; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + cfg.option = "sortWhenGroupBy"; + cfg.ptr = &tsSortWhenGroupBy; + cfg.valType = TAOS_CFG_VTYPE_INT8; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + #ifdef TD_TSZ // lossy compress cfg.option = "lossyColumns"; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index bb9ded8e40258060cc6a80c4bb0e2ecd42bc74a0..dca79babce9388d7f2d1750db2c9b8410b23c8d1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -621,7 +621,7 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult } // too many time window in query - if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) { + if (pResultRowInfo->size > tsMaxSqlGroups) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } @@ -1925,7 +1925,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo* pIn STimeWindow w = TSWINDOW_INITIALIZER; char* key = NULL; - int16_t num = 0; + int32_t num = 0; int32_t type = 0; for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { buildGroupbyKeyBuf(pSDataBlock, pInfo, j, &key); @@ -7607,7 +7607,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { } initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo); - if (!pRuntimeEnv->pQueryAttr->stableQuery) { + if (!pRuntimeEnv->pQueryAttr->stableQuery && tsSortWhenGroupBy) { sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes, pInfo->binfo.pCtx); } diff --git a/src/util/inc/tconfig.h b/src/util/inc/tconfig.h index f207b9b5c4dd270cc4364e548449edcbee43ae4a..1ccaa48e7204a0a455e3c0d5fc73f2dfeecc6cc3 100644 --- a/src/util/inc/tconfig.h +++ b/src/util/inc/tconfig.h @@ -20,7 +20,7 @@ extern "C" { #endif -#define TSDB_CFG_MAX_NUM 138 +#define TSDB_CFG_MAX_NUM 140 #define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_VALUE_LEN 41 diff --git a/src/util/src/tref.c b/src/util/src/tref.c index bff8b12aaefc1734318e891efab7a9b02e6557f4..c4a44d71f1fc8921577ba115be210451880c0db0 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -195,7 +195,6 @@ int taosRemoveRef(int rsetId, int64_t rid) return taosDecRefCount(rsetId, rid, 1); } -// if rid is 0, return the first p in hash list, otherwise, return the next after current rid void *taosAcquireRef(int rsetId, int64_t rid) { int hash; diff --git a/tests/pytest/stable/insert.py b/tests/pytest/stable/insert.py index f46a6bcf4a0becf6cbee54281a9b3ecc862d63ce..2ee140debfed29189bbf7a81cb3617e831221768 100644 --- a/tests/pytest/stable/insert.py +++ b/tests/pytest/stable/insert.py @@ -96,6 +96,23 @@ class TDTestCase: tdSql.execute("drop stable if exists db.st") tdSql.execute("create table stb(ts timestamp, c1 int) tags(t1 int)") tdSql.error("create table `` using stb tags(1)") + + # TS-1760 + sql = "create table stb9 (ts timestamp" + for i in range(999): + sql += ", longcolumntest%d double" % i + sql += ") tags(t1 int)" + + tdSql.execute(sql) + tdSql.query("describe stb9") + tdSql.checkRows(1001) + tdSql.query("show create table stb9") + query = tdSql.getData(0, 1) + + tdSql.execute("drop table if exists stb9") + tdSql.execute(query) + tdSql.query("describe stb9") + tdSql.checkRows(1001) def stop(self): tdSql.close()