diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index f7832c9818226d19ba1e20cb47fa17c5bfb0f611..d3996ccf7fe85a6405addfcbef023aa2ae6bf08b 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -36,7 +36,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql); int32_t tscHandleMultivnodeInsert(SSqlObj *pSql); -int32_t tscHandleInsertRetry(SSqlObj* pSql); +int32_t tscHandleInsertRetry(SSqlObj* parent, SSqlObj* child); void tscBuildResFromSubqueries(SSqlObj *pSql); TAOS_ROW doSetResultRowData(SSqlObj *pSql); diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 2c8641da760fd2e1f340cc4627559ec1a226686c..eddfa62966113d1a2befd2ed409d04c1e0665a61 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -110,11 +110,12 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint uint32_t offset); void* tscDestroyBlockArrayList(SArray* pDataBlockList); +void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable); + int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); -int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pDataList); -int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size, - int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, - STableDataBlocks** dataBlocks); +int32_t tscMergeTableDataBlocks(SSqlObj* pSql); +int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, + STableDataBlocks** dataBlocks, SArray* pBlockList); /** * for the projection query on metric or point interpolation query on metric, @@ -275,6 +276,8 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex); bool hasMoreVnodesToTry(SSqlObj *pSql); bool hasMoreClauseToTry(SSqlObj* pSql); +void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache); + void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 748a9b299626c285e5089af848bed9e0ad26e0a8..bdb35cb07274a880c98aa2c93005c47fe05c7de2 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -37,40 +37,6 @@ extern "C" { #include "qTsbuf.h" #include "tcmdtype.h" -#if 0 -static UNUSED_FUNC void *u_malloc (size_t __size) { - uint32_t v = rand(); - - if (v % 5000 <= 0) { - return NULL; - } else { - return malloc(__size); - } -} - -static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) { - uint32_t v = rand(); - if (v % 5000 <= 0) { - return NULL; - } else { - return calloc(num, __size); - } -} - -static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { - uint32_t v = rand(); - if (v % 5000 <= 0) { - return NULL; - } else { - return realloc(p, __size); - } -} - -#define calloc u_calloc -#define malloc u_malloc -#define realloc u_realloc -#endif - // forward declaration struct SSqlInfo; struct SLocalReducer; @@ -78,7 +44,7 @@ struct SLocalReducer; // data source from sql string or from file enum { DATA_FROM_SQL_STRING = 1, - DATA_FROM_DATA_FILE = 2, + DATA_FROM_DATA_FILE = 2, }; typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); @@ -118,10 +84,10 @@ typedef struct STableMetaInfo { * 1. keep the vgroup index during the multi-vnode super table projection query * 2. keep the vgroup index for multi-vnode insertion */ - int32_t vgroupIndex; - char name[TSDB_TABLE_FNAME_LEN]; // (super) table name - char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql - SArray* tagColList; // SArray, involved tag columns + int32_t vgroupIndex; + char name[TSDB_TABLE_FNAME_LEN]; // (super) table name + char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql + SArray *tagColList; // SArray, involved tag columns } STableMetaInfo; /* the structure for sql function in select clause */ @@ -136,7 +102,7 @@ typedef struct SSqlExpr { int16_t numOfParams; // argument value of each function tVariant param[3]; // parameters are not more than 3 int32_t offset; // sub result column value of arithmetic expression. - int16_t resColId; // result column id + int16_t resColId; // result column id } SSqlExpr; typedef struct SColumnIndex { @@ -204,22 +170,17 @@ typedef struct SParamInfo { } SParamInfo; typedef struct STableDataBlocks { - char tableId[TSDB_TABLE_FNAME_LEN]; - int8_t tsSource; // where does the UNIX timestamp come from, server or client - bool ordered; // if current rows are ordered or not - int64_t vgId; // virtual group id - int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending - int32_t numOfTables; // number of tables in current submit block - int32_t rowSize; // row size for current table - uint32_t nAllocSize; - uint32_t headerSize; // header for table info (uid, tid, submit metadata) - uint32_t size; - - /* - * the table meta of table, the table meta will be used during submit, keep a ref - * to avoid it to be removed from cache - */ - STableMeta *pTableMeta; + char tableId[TSDB_TABLE_FNAME_LEN]; + int8_t tsSource; // where does the UNIX timestamp come from, server or client + bool ordered; // if current rows are ordered or not + int64_t vgId; // virtual group id + int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending + int32_t numOfTables; // number of tables in current submit block + int32_t rowSize; // row size for current table + uint32_t nAllocSize; + uint32_t headerSize; // header for table info (uid, tid, submit metadata) + uint32_t size; + STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache char *pData; // for parameter ('?') binding @@ -252,7 +213,7 @@ typedef struct SQueryInfo { int64_t clauseLimit; // limit for current sub clause int64_t prjOffset; // offset value in the original sql expression, only applied at client side - int64_t tableLimit; // table limit in case of super table projection query + global order + limit + int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX int16_t resColumnId; // result column id @@ -284,10 +245,14 @@ typedef struct { int32_t numOfParams; int8_t dataSourceType; // load data from file or not - int8_t submitSchema; // submit block is built with table schema - STagData *pTagData; // NOTE: pTagData->data is used as a variant length array - SHashObj *pTableList; // referred table involved in sql - SArray *pDataBlocks; // SArray submit data blocks after parsing sql + int8_t submitSchema; // submit block is built with table schema + STagData *pTagData; // NOTE: pTagData->data is used as a variant length array + + STableMeta **pTableMetaList; // all involved tableMeta list of current insert sql statement. + int32_t numOfTables; + + SHashObj *pTableBlockHashList; // data block for each table + SArray *pDataBlocks; // SArray. Merged submit block for each vgroup } SSqlCmd; typedef struct SResRec { diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 6b2722b43f845f75bb0a6006cf8bc3f0b5dbc7aa..dfdf97ea432f276de0f73a102f87bb6fc42e4e3e 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -410,52 +410,26 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (code != TSDB_CODE_SUCCESS) { tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code)); goto _error; - } else { - tscDebug("%p get %s successfully", pSql, msg); } + tscDebug("%p get %s successfully", pSql, msg); if (pSql->pStream == NULL) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); // check if it is a sub-query of super table query first, if true, enter another routine - if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) { - tscDebug("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql); + if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) { + tscDebug("%p update table meta in local cache, continue to process sql and send the corresponding query", pSql); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - return; - } else { - assert(code == TSDB_CODE_SUCCESS); - } - - // param already freed by other routine and pSql in tscCache when ctrl + c - if (atomic_load_ptr(&pSql->param) == NULL) { - return; - } - assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0)); + assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS); - SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; - SSqlObj * pParObj = trs->pParentSql; - - // NOTE: the vgroupInfo for the queried super table must be existed here. - assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex && - pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL); - - // tscProcessSql can add error into async res - tscProcessSql(pSql); - return; - } else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) { - tscDebug("%p update table meta in local cache, continue to process sql and send corresponding tid_tag query", pSql); - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { return; - } else { - assert(code == TSDB_CODE_SUCCESS); } assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0)); + // tscProcessSql can add error into async res tscProcessSql(pSql); return; @@ -465,16 +439,18 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); + + assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { return; - } else { - assert(code == TSDB_CODE_SUCCESS); } + assert(pCmd->command != TSDB_SQL_INSERT); + // in case of insert, redo parsing the sql string and build new submit data block for two reasons: // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly. // 2. vnode may need the schema information along with submit block to update its local table schema. - if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) { + if (pCmd->command == TSDB_SQL_SELECT) { tscDebug("%p redo parse sql string and proceed", pSql); pCmd->parseFinished = false; tscResetSqlCmdObj(pCmd, false); @@ -486,16 +462,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { goto _error; } - if (pCmd->command == TSDB_SQL_INSERT) { - /* - * Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks, - * and send the required submit block according to index value in supporter to server. - */ - pSql->fp = pSql->fetchFp; // restore the fp - tscHandleInsertRetry(pSql); - } else if (pCmd->command == TSDB_SQL_SELECT) { // in case of other query type, continue - tscProcessSql(pSql); - } + tscProcessSql(pSql); + }else { // in all other cases, simple retry tscProcessSql(pSql); } @@ -551,6 +519,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (!pSql->cmd.parseFinished) { tsParseSql(pSql, false); } + (*pSql->fp)(pSql->param, pSql, code); return; diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 7921399330f876a217b8f14f4871d7f47cebec21..66b9eed21174de02df9afc8ff0aa7e681bc36929 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2589,10 +2589,11 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) { // all data are null, set it completed if (pInfo->numOfElems == 0) { pResInfo->complete = true; + } else { + pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, GET_DOUBLE_VAL(&pInfo->minval), GET_DOUBLE_VAL(&pInfo->maxval)); } pInfo->stage += 1; - pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, GET_DOUBLE_VAL(&pInfo->minval), GET_DOUBLE_VAL(&pInfo->maxval)); } else { pResInfo->complete = true; } diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 7fc5b8debb82d18f3067286eea5571c748229072..a99918975e11a1f51bc93eebf6053cfd8dca05c3 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -726,10 +726,14 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); SSchema p1 = {0}; - if (pExpr->colInfo.colIndex != TSDB_TBNAME_COLUMN_INDEX) { - p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex); - } else { + if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) { p1 = tGetTableNameColumnSchema(); + } else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { + p1.bytes = pExpr->resBytes; + p1.type = (uint8_t) pExpr->resType; + tstrncpy(p1.name, pExpr->aliasName, tListLen(p1.name)); + } else { + p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex); } int32_t inter = 0; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 18e5b6f0744a06e5d28f2ce5cfa3df1244a0dc89..9d04a5c13a5befd02517f94eedf7724a85fa8326 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -686,17 +686,14 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) { } } -static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **str, SParsedDataColInfo *spd, - int32_t *totalNum) { - SSqlCmd * pCmd = &pSql->cmd; +static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColInfo *spd, int32_t *totalNum) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; + STableMeta *pTableMeta = pTableMetaInfo->pTableMeta; STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableDataBlocks *dataBuf = NULL; - int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, - pTableMeta, &dataBuf); + int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE, + sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &dataBuf, NULL); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -1058,18 +1055,17 @@ int tsParseInsertSql(SSqlObj *pSql) { return code; } - if (NULL == pCmd->pTableList) { - pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES); - if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) { + if (NULL == pCmd->pTableBlockHashList) { + pCmd->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (NULL == pCmd->pTableBlockHashList) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; + goto _clean; } } else { str = pCmd->curSql; } - tscDebug("%p create data block list for submit data:%p, pTableList:%p", pSql, pCmd->pDataBlocks, pCmd->pTableList); + tscDebug("%p create data block list hashList:%p", pSql, pCmd->pTableBlockHashList); while (1) { int32_t index = 0; @@ -1091,7 +1087,7 @@ int tsParseInsertSql(SSqlObj *pSql) { */ if (totalNum == 0) { code = TSDB_CODE_TSC_INVALID_SQL; - goto _error; + goto _clean; } else { break; } @@ -1104,11 +1100,11 @@ int tsParseInsertSql(SSqlObj *pSql) { // Check if the table name available or not if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) { code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z); - goto _error; + goto _clean; } if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { @@ -1122,12 +1118,12 @@ int tsParseInsertSql(SSqlObj *pSql) { tscError("%p async insert parse error, code:%s", pSql, tstrerror(code)); pCmd->curSql = NULL; - goto _error; + goto _clean; } if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL); - goto _error; + goto _clean; } index = 0; @@ -1136,7 +1132,7 @@ int tsParseInsertSql(SSqlObj *pSql) { if (sToken.n == 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z); - goto _error; + goto _clean; } STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); @@ -1148,32 +1144,32 @@ int tsParseInsertSql(SSqlObj *pSql) { tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns); if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } /* * app here insert data in different vnodes, so we need to set the following * data in another submit procedure using async insert routines */ - code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); + code = doParseInsertStatement(pCmd, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } } else if (sToken.type == TK_FILE) { if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } index = 0; sToken = tStrGetToken(str, &index, false, 0, NULL); if (sToken.type != TK_STRING && sToken.type != TK_ID) { code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); - goto _error; + goto _clean; } str += index; if (sToken.n == 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); - goto _error; + goto _clean; } strncpy(pCmd->payload, sToken.z, sToken.n); @@ -1183,7 +1179,7 @@ int tsParseInsertSql(SSqlObj *pSql) { wordexp_t full_path; if (wordexp(pCmd->payload, &full_path, 0) != 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z); - goto _error; + goto _clean; } tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize); @@ -1195,7 +1191,7 @@ int tsParseInsertSql(SSqlObj *pSql) { SSchema * pSchema = tscGetTableSchema(pTableMeta); if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } SParsedDataColInfo spd = {0}; @@ -1230,7 +1226,7 @@ int tsParseInsertSql(SSqlObj *pSql) { if (spd.hasVal[t] == true) { code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z); - goto _error; + goto _clean; } spd.hasVal[t] = true; @@ -1241,13 +1237,13 @@ int tsParseInsertSql(SSqlObj *pSql) { if (!findColumnIndex) { code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z); - goto _error; + goto _clean; } } if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) { code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z); - goto _error; + goto _clean; } index = 0; @@ -1256,16 +1252,16 @@ int tsParseInsertSql(SSqlObj *pSql) { if (sToken.type != TK_VALUES) { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z); - goto _error; + goto _clean; } - code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); + code = doParseInsertStatement(pCmd, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } } else { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z); - goto _error; + goto _clean; } } @@ -1274,25 +1270,18 @@ int tsParseInsertSql(SSqlObj *pSql) { goto _clean; } - if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgId - if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { - goto _error; + if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId + if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) { + goto _clean; } } code = TSDB_CODE_SUCCESS; goto _clean; -_error: - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); - _clean: - taosHashCleanup(pCmd->pTableList); - pCmd->pTableList = NULL; - - pCmd->curSql = NULL; + pCmd->curSql = NULL; pCmd->parseFinished = 1; - return code; } @@ -1373,6 +1362,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { pSql->parseRetry++; ret = tscToSQLCmd(pSql, &SQLInfo); } + SQLInfoDestroy(&SQLInfo); } @@ -1399,7 +1389,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL); } - if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { + if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 30e07294272fa2c8aae823bee428320455d9eb0f..8134a35811017e58f3b38a3601627f5ad24590a7 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -800,9 +800,9 @@ static int insertStmtExecute(STscStmt* stmt) { STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); assert(pCmd->numOfClause == 1); - if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { + if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgid - int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks); + int code = tscMergeTableDataBlocks(stmt->pSql); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index dd336bcf051ea28f172ad31beb10eee09c0f2549..c74d0ef2cf14ed2fcd7520a870ca48c51f059039 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1310,7 +1310,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t SColumnIndex index = {.tableIndex = tableIndex}; SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, sizeof(double), - -1000, sizeof(double), false); + getNewResColId(pQueryInfo), sizeof(double), false); char* name = (pItem->aliasName != NULL)? pItem->aliasName:pItem->pNode->token.z; size_t len = MIN(sizeof(pExpr->aliasName), pItem->pNode->token.n + 1); @@ -5317,7 +5317,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn // keep original limitation value in globalLimit pQueryInfo->clauseLimit = pQueryInfo->limit.limit; pQueryInfo->prjOffset = pQueryInfo->limit.offset; - pQueryInfo->tableLimit = -1; + pQueryInfo->vgroupLimit = -1; if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { /* @@ -5327,7 +5327,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn * than or equal to the value of limit. */ if (pQueryInfo->limit.limit > 0) { - pQueryInfo->tableLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset; + pQueryInfo->vgroupLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset; pQueryInfo->limit.limit = -1; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 59bcdd691d6a8cfe2f2e65da0c87da9c232fee57..960f2561e294c1aad2be6218143166dbd5d9cc46 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -280,19 +280,19 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } int32_t cmd = pCmd->command; - if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && + + // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema + if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + pSql->cmd.submitSchema = 1; + } + + if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || - rpcMsg->code == TSDB_CODE_APP_NOT_READY || - rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) { + rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); - // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema - if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - pSql->cmd.submitSchema = 1; - } - pSql->res.code = rpcMsg->code; // keep the previous error code if (pSql->retry > pSql->maxRetry) { tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); @@ -451,10 +451,10 @@ int tscProcessSql(SSqlObj *pSql) { int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload; - pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); - pRetrieveMsg->free = htons(pQueryInfo->type); + pRetrieveMsg->free = htons(pQueryInfo->type); + pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); // todo valid the vgroupId at the client side STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -681,7 +681,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->queryType = htonl(pQueryInfo->type); - pQueryMsg->tableLimit = htobe64(pQueryInfo->tableLimit); + pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number @@ -1394,6 +1394,43 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } +//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { +// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload; +// pCancelMsg->qhandle = htobe64(pSql->res.qhandle); +// +// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); +// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); +// +// if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { +// int32_t vgIndex = pTableMetaInfo->vgroupIndex; +// if (pTableMetaInfo->pVgroupTables == NULL) { +// SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList; +// assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); +// +// pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); +// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex); +// } else { +// int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); +// assert(vgIndex >= 0 && vgIndex < numOfVgroups); +// +// SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); +// +// pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); +// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex); +// } +// } else { +// STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; +// pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); +// tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId); +// } +// +// pSql->cmd.payloadLen = sizeof(SCancelQueryMsg); +// pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY; +// +// pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg)); +// return TSDB_CODE_SUCCESS; +//} + int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; pCmd->payloadLen = sizeof(SAlterDbMsg); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 8e165241f0930bca07ae0a9765a4013fd80c85bc..d7dec2f35643dfb68e66e3d8197f3a735d5a700e 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -900,9 +900,9 @@ int taos_validate_sql(TAOS *taos, const char *sql) { strtolower(pSql->sqlstr, sql); pCmd->curSql = NULL; - if (NULL != pCmd->pTableList) { - taosHashCleanup(pCmd->pTableList); - pCmd->pTableList = NULL; + if (NULL != pCmd->pTableBlockHashList) { + taosHashCleanup(pCmd->pTableBlockHashList); + pCmd->pTableBlockHashList = NULL; } pSql->fp = asyncCallback; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 819a323db5e947df46158b9665c0f5d5c491f532..973f21c92b50570a69a4cab2b096877006e3d4e4 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2149,6 +2149,29 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { } } +static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { + if (pParentObj->retry > pParentObj->maxRetry) { + tscError("%p max retry reached, abort the retry effort", pParentObj) + return false; + } + + for (int32_t i = 0; i < numOfSub; ++i) { + int32_t code = pParentObj->pSubs[i]->res.code; + if (code == TSDB_CODE_SUCCESS) { + continue; + } + + if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE && code != TSDB_CODE_TDB_INVALID_TABLE_ID && + code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && + code != TSDB_CODE_APP_NOT_READY) { + pParentObj->res.code = code; + return false; + } + } + + return true; +} + static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; @@ -2163,23 +2186,80 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) assert(pSql != NULL && pSql->res.code == numOfRows); pParentObj->res.code = pSql->res.code; - } - tfree(pSupporter); + // set the flag in the parent sqlObj + if (pSql->cmd.submitSchema) { + pParentObj->cmd.submitSchema = 1; + } + } if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { return; } - - tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); // restore user defined fp pParentObj->fp = pParentObj->fetchFp; + int32_t numOfSub = pParentObj->subState.numOfSub; + + if (pParentObj->res.code == TSDB_CODE_SUCCESS) { + tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); + for(int32_t i = 0; i < numOfSub; ++i) { + SSqlObj* pSql = pParentObj->pSubs[i]; + tfree(pSql->param); + } + + // todo remove this parameter in async callback function definition. + // all data has been sent to vnode, call user function + int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; + (*pParentObj->fp)(pParentObj->param, pParentObj, v); + } else { + if (!needRetryInsert(pParentObj, numOfSub)) { + tscQueueAsyncRes(pParentObj); + return; + } + + int32_t numOfFailed = 0; + for(int32_t i = 0; i < numOfSub; ++i) { + SSqlObj* pSql = pParentObj->pSubs[i]; + if (pSql->res.code != TSDB_CODE_SUCCESS) { + numOfFailed += 1; + + // clean up tableMeta in cache + tscFreeQueryInfo(&pSql->cmd, true); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0); + STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0); + tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); - // todo remove this parameter in async callback function definition. - // all data has been sent to vnode, call user function - int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; - (*pParentObj->fp)(pParentObj->param, pParentObj, v); + tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql); + } + } + + tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj, + pParentObj->res.numOfRows, numOfFailed, numOfSub); + + tscDebug("%p cleanup %d tableMeta in cache", pParentObj, pParentObj->cmd.numOfTables); + for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) { + taosCacheRelease(tscMetaCache, (void**)&(pParentObj->cmd.pTableMetaList[i]), true); + } + + pParentObj->cmd.parseFinished = false; + pParentObj->subState.numOfRemain = numOfFailed; + pParentObj->subState.numOfSub = numOfFailed; + + tscResetSqlCmdObj(&pParentObj->cmd, false); + + tscDebug("%p re-parse sql to generate submit data, retry:%d", pParentObj, pParentObj->retry++); + int32_t code = tsParseSql(pParentObj, true); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; + + if (code != TSDB_CODE_SUCCESS) { + pParentObj->res.code = code; + tscQueueAsyncRes(pParentObj); + return; + } + + tscDoQuery(pParentObj); + } } /** @@ -2187,19 +2267,19 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) * @param pSql * @return */ -int32_t tscHandleInsertRetry(SSqlObj* pSql) { +int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) { assert(pSql != NULL && pSql->param != NULL); - SSqlCmd* pCmd = &pSql->cmd; +// SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param; assert(pSupporter->index < pSupporter->pSql->subState.numOfSub); - STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); + STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); // free the data block created from insert sql string - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); +// pCmd->pDataBlocks = tscDestroyBlockArrayList(pParent->cmd.pDataBlocks); if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { tscQueueAsyncRes(pSql); @@ -2213,6 +2293,20 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; + // it is the failure retry insert + if (pSql->pSubs != NULL) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + + tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, i); + if (pSub->res.code != TSDB_CODE_SUCCESS) { + tscHandleInsertRetry(pSql, pSub); + } + } + + return TSDB_CODE_SUCCESS; + } + pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks); assert(pSql->subState.numOfSub > 0); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a98132d319cf5403503c415d99c5fd0a72fc1941..dbd626d360fa6948396753bb7c76da7e5f8dcf23 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -333,13 +333,15 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { if (isNull(p, TSDB_DATA_TYPE_NCHAR)) { memcpy(dst, p, varDataTLen(p)); - } else { + } else if (varDataLen(p) > 0) { int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst)); varDataSetLen(dst, length); if (length == 0) { tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p); } + } else { + varDataSetLen(dst, 0); } p += pInfo->field.bytes; @@ -377,7 +379,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) { pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free } -static void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) { +void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) { if (pCmd == NULL || pCmd->numOfClause == 0) { return; } @@ -403,12 +405,12 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) { pCmd->msgType = 0; pCmd->parseFinished = 0; pCmd->autoCreated = 0; - - taosHashCleanup(pCmd->pTableList); - pCmd->pTableList = NULL; - - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); + pCmd->numOfTables = 0; + + tfree(pCmd->pTableMetaList); + pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList); + pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); tscFreeQueryInfo(pCmd, removeFromCache); } @@ -575,6 +577,21 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) { return NULL; } +void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable) { + if (pBlockHashTable == NULL) { + return NULL; + } + + STableDataBlocks** p = taosHashIterate(pBlockHashTable, NULL); + while(p) { + tscDestroyDataBlock(*p); + p = taosHashIterate(pBlockHashTable, p); + } + + taosHashCleanup(pBlockHashTable); + return NULL; +} + int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { SSqlCmd* pCmd = &pSql->cmd; assert(pDataBlock->pTableMeta != NULL); @@ -671,9 +688,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff return TSDB_CODE_SUCCESS; } -int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size, - int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, - STableDataBlocks** dataBlocks) { +int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, + STableDataBlocks** dataBlocks, SArray* pBlockList) { *dataBlocks = NULL; STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id)); @@ -688,7 +704,9 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t } taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES); - taosArrayPush(pDataBlockList, dataBlocks); + if (pBlockList) { + taosArrayPush(pBlockList, dataBlocks); + } } return TSDB_CODE_SUCCESS; @@ -769,22 +787,37 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { return result; } -int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { +static void extractTableMeta(SSqlCmd* pCmd) { + pCmd->numOfTables = (int32_t) taosHashGetSize(pCmd->pTableBlockHashList); + pCmd->pTableMetaList = calloc(pCmd->numOfTables, POINTER_BYTES); + + STableDataBlocks **p1 = taosHashIterate(pCmd->pTableBlockHashList, NULL); + int32_t i = 0; + while(p1) { + STableDataBlocks* pBlocks = *p1; + pCmd->pTableMetaList[i++] = taosCacheTransfer(tscMetaCache, (void**) &pBlocks->pTableMeta); + p1 = taosHashIterate(pCmd->pTableBlockHashList, p1); + } + + pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList); +} + +int32_t tscMergeTableDataBlocks(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); - size_t total = taosArrayGetSize(pTableDataBlockList); - for (int32_t i = 0; i < total; ++i) { + STableDataBlocks** p = taosHashIterate(pCmd->pTableBlockHashList, NULL); + + STableDataBlocks* pOneTableBlock = *p; + while(pOneTableBlock) { // the maximum expanded size in byte when a row-wise data is converted to SDataRow format - STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i); int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta); STableDataBlocks* dataBuf = NULL; - int32_t ret = - tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, - tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf); + int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, + tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList); if (ret != TSDB_CODE_SUCCESS) { tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret); taosHashCleanup(pVnodeDataBlockHashList); @@ -839,14 +872,19 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { // the length does not include the SSubmitBlk structure pBlocks->dataLen = htonl(finalLen); dataBuf->numOfTables += 1; + + p = taosHashIterate(pCmd->pTableBlockHashList, p); + if (p == NULL) { + break; + } + + pOneTableBlock = *p; } - tscDestroyBlockArrayList(pTableDataBlockList); + extractTableMeta(pCmd); // free the table data blocks; pCmd->pDataBlocks = pVnodeDataBlockList; - -// tscFreeUnusedDataBlocks(pCmd->pDataBlocks); taosHashCleanup(pVnodeDataBlockHashList); return TSDB_CODE_SUCCESS; @@ -2023,6 +2061,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNewQueryInfo->limit = pQueryInfo->limit; pNewQueryInfo->slimit = pQueryInfo->slimit; pNewQueryInfo->order = pQueryInfo->order; + pNewQueryInfo->vgroupLimit = pQueryInfo->vgroupLimit; pNewQueryInfo->tsBuf = NULL; pNewQueryInfo->fillType = pQueryInfo->fillType; pNewQueryInfo->fillVal = NULL; diff --git a/src/common/inc/tcmdtype.h b/src/common/inc/tcmdtype.h index 69bbccd67e79eb77e3afd0523dec765e53a5cfa1..bec85905369b5a1ea4274a9d799c664ce7aee544 100644 --- a/src/common/inc/tcmdtype.h +++ b/src/common/inc/tcmdtype.h @@ -36,7 +36,7 @@ enum { TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" ) - + // the SQL below is for mgmt node TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) diff --git a/src/dnode/inc/dnodeVRead.h b/src/dnode/inc/dnodeVRead.h index 30dfb1b3a47509c1c3c5568520de0f092adba2c4..9c88886f88bedaa63a5071b9dd2d773d4ff1cc0c 100644 --- a/src/dnode/inc/dnodeVRead.h +++ b/src/dnode/inc/dnodeVRead.h @@ -24,8 +24,10 @@ extern "C" { int32_t dnodeInitVRead(); void dnodeCleanupVRead(); void dnodeDispatchToVReadQueue(SRpcMsg *pMsg); -void * dnodeAllocVReadQueue(void *pVnode); -void dnodeFreeVReadQueue(void *pRqueue); +void * dnodeAllocVQueryQueue(void *pVnode); +void * dnodeAllocVFetchQueue(void *pVnode); +void dnodeFreeVQueryQueue(void *pQqueue); +void dnodeFreeVFetchQueue(void *pFqueue); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeVMgmt.c b/src/dnode/src/dnodeVMgmt.c index 3c975f5cf9be7c7293ae08e101262b730e4a1817..18235f7835425f6730ca95b8ac5e62e5ca84f586 100644 --- a/src/dnode/src/dnodeVMgmt.c +++ b/src/dnode/src/dnodeVMgmt.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tqueue.h" +#include "tworker.h" #include "dnodeVMgmt.h" typedef struct { @@ -23,9 +24,8 @@ typedef struct { char pCont[]; } SMgmtMsg; -static taos_qset tsMgmtQset = NULL; -static taos_queue tsMgmtQueue = NULL; -static pthread_t tsQthread; +static SWorkerPool tsVMgmtWP; +static taos_queue tsVMgmtQueue = NULL; static void * dnodeProcessMgmtQueue(void *param); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); @@ -47,45 +47,23 @@ int32_t dnodeInitVMgmt() { int32_t code = vnodeInitMgmt(); if (code != TSDB_CODE_SUCCESS) return -1; - tsMgmtQset = taosOpenQset(); - if (tsMgmtQset == NULL) { - dError("failed to create the vmgmt queue set"); - return -1; - } - - tsMgmtQueue = taosOpenQueue(); - if (tsMgmtQueue == NULL) { - dError("failed to create the vmgmt queue"); - return -1; - } - - taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL); + tsVMgmtWP.name = "vmgmt"; + tsVMgmtWP.workerFp = dnodeProcessMgmtQueue; + tsVMgmtWP.min = 1; + tsVMgmtWP.max = 1; + if (tWorkerInit(&tsVMgmtWP) != 0) return -1; - pthread_attr_t thAttr; - pthread_attr_init(&thAttr); - pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - - code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL); - pthread_attr_destroy(&thAttr); - if (code != 0) { - dError("failed to create thread to process vmgmt queue, reason:%s", strerror(errno)); - return -1; - } + tsVMgmtQueue = tWorkerAllocQueue(&tsVMgmtWP, NULL); dInfo("dnode vmgmt is initialized"); return TSDB_CODE_SUCCESS; } void dnodeCleanupVMgmt() { - if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset); - if (tsQthread) pthread_join(tsQthread, NULL); - - if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue); - if (tsMgmtQset) taosCloseQset(tsMgmtQset); - - tsMgmtQset = NULL; - tsMgmtQueue = NULL; + tWorkerFreeQueue(&tsVMgmtWP, tsVMgmtQueue); + tWorkerCleanup(&tsVMgmtWP); + tsVMgmtQueue = NULL; vnodeCleanupMgmt(); } @@ -97,7 +75,7 @@ static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) { pMgmt->rpcMsg = *pMsg; pMgmt->rpcMsg.pCont = pMgmt->pCont; memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen); - taosWriteQitem(tsMgmtQueue, TAOS_QTYPE_RPC, pMgmt); + taosWriteQitem(tsVMgmtQueue, TAOS_QTYPE_RPC, pMgmt); return TSDB_CODE_SUCCESS; } @@ -112,16 +90,18 @@ void dnodeDispatchToVMgmtQueue(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static void *dnodeProcessMgmtQueue(void *param) { - SMgmtMsg *pMgmt; - SRpcMsg * pMsg; - SRpcMsg rsp = {0}; - int32_t qtype; - void * handle; +static void *dnodeProcessMgmtQueue(void *wparam) { + SWorker * pWorker = wparam; + SWorkerPool *pPool = pWorker->pPool; + SMgmtMsg * pMgmt; + SRpcMsg * pMsg; + SRpcMsg rsp = {0}; + int32_t qtype; + void * handle; while (1) { - if (taosReadQitemFromQset(tsMgmtQset, &qtype, (void **)&pMgmt, &handle) == 0) { - dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset); + if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pMgmt, &handle) == 0) { + dDebug("qdnode mgmt got no message from qset:%p, , exit", pPool->qset); break; } diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 07496b142a210904e9842c0fe7a0ad613e41ba7a..0d4add2a5c4c793667bb36300fbe3baf1894ce6a 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -16,66 +16,35 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tqueue.h" +#include "tworker.h" #include "dnodeVRead.h" -typedef struct { - pthread_t thread; // thread - int32_t workerId; // worker ID -} SVReadWorker; - -typedef struct { - int32_t max; // max number of workers - int32_t min; // min number of workers - int32_t num; // current number of workers - SVReadWorker * worker; - pthread_mutex_t mutex; -} SVReadWorkerPool; - static void *dnodeProcessReadQueue(void *pWorker); // module global variable -static SVReadWorkerPool tsVReadWP; -static taos_qset tsVReadQset; +static SWorkerPool tsVQueryWP; +static SWorkerPool tsVFetchWP; int32_t dnodeInitVRead() { - tsVReadQset = taosOpenQset(); - - tsVReadWP.min = tsNumOfCores; - tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore; - if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min; - tsVReadWP.worker = calloc(sizeof(SVReadWorker), tsVReadWP.max); - pthread_mutex_init(&tsVReadWP.mutex, NULL); - - if (tsVReadWP.worker == NULL) return -1; - for (int i = 0; i < tsVReadWP.max; ++i) { - SVReadWorker *pWorker = tsVReadWP.worker + i; - pWorker->workerId = i; - } + tsVQueryWP.name = "vquery"; + tsVQueryWP.workerFp = dnodeProcessReadQueue; + tsVQueryWP.min = tsNumOfCores; + tsVQueryWP.max = tsNumOfCores/* * tsNumOfThreadsPerCore*/; +// if (tsVQueryWP.max <= tsVQueryWP.min * 2) tsVQueryWP.max = 2 * tsVQueryWP.min; + if (tWorkerInit(&tsVQueryWP) != 0) return -1; + + tsVFetchWP.name = "vfetch"; + tsVFetchWP.workerFp = dnodeProcessReadQueue; + tsVFetchWP.min = MIN(4, tsNumOfCores); + tsVFetchWP.max = tsVFetchWP.min; + if (tWorkerInit(&tsVFetchWP) != 0) return -1; - dInfo("dnode vread is initialized, min worker:%d max worker:%d", tsVReadWP.min, tsVReadWP.max); return 0; } void dnodeCleanupVRead() { - for (int i = 0; i < tsVReadWP.max; ++i) { - SVReadWorker *pWorker = tsVReadWP.worker + i; - if (pWorker->thread) { - taosQsetThreadResume(tsVReadQset); - } - } - - for (int i = 0; i < tsVReadWP.max; ++i) { - SVReadWorker *pWorker = tsVReadWP.worker + i; - if (pWorker->thread) { - pthread_join(pWorker->thread, NULL); - } - } - - free(tsVReadWP.worker); - taosCloseQset(tsVReadQset); - pthread_mutex_destroy(&tsVReadWP.mutex); - - dInfo("dnode vread is closed"); + tWorkerCleanup(&tsVFetchWP); + tWorkerCleanup(&tsVQueryWP); } void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { @@ -88,6 +57,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); + assert(pHead->contLen > 0); void *pVnode = vnodeAcquire(pHead->vgId); if (pVnode != NULL) { int32_t code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg); @@ -107,43 +77,20 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -void *dnodeAllocVReadQueue(void *pVnode) { - pthread_mutex_lock(&tsVReadWP.mutex); - taos_queue queue = taosOpenQueue(); - if (queue == NULL) { - pthread_mutex_unlock(&tsVReadWP.mutex); - return NULL; - } - - taosAddIntoQset(tsVReadQset, queue, pVnode); - - // spawn a thread to process queue - if (tsVReadWP.num < tsVReadWP.max) { - do { - SVReadWorker *pWorker = tsVReadWP.worker + tsVReadWP.num; - - pthread_attr_t thAttr; - pthread_attr_init(&thAttr); - pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - - if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) { - dError("failed to create thread to process vread vqueue since %s", strerror(errno)); - } - - pthread_attr_destroy(&thAttr); - tsVReadWP.num++; - dDebug("dnode vread worker:%d is launched, total:%d", pWorker->workerId, tsVReadWP.num); - } while (tsVReadWP.num < tsVReadWP.min); - } +void *dnodeAllocVQueryQueue(void *pVnode) { + return tWorkerAllocQueue(&tsVQueryWP, pVnode); +} - pthread_mutex_unlock(&tsVReadWP.mutex); - dDebug("pVnode:%p, dnode vread queue:%p is allocated", pVnode, queue); +void *dnodeAllocVFetchQueue(void *pVnode) { + return tWorkerAllocQueue(&tsVFetchWP, pVnode); +} - return queue; +void dnodeFreeVQueryQueue(void *pQqueue) { + tWorkerFreeQueue(&tsVQueryWP, pQqueue); } -void dnodeFreeVReadQueue(void *pRqueue) { - taosCloseQueue(pRqueue); +void dnodeFreeVFetchQueue(void *pFqueue) { + tWorkerFreeQueue(&tsVFetchWP, pFqueue); } void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) { @@ -160,18 +107,20 @@ void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) { void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) { } -static void *dnodeProcessReadQueue(void *pWorker) { - SVReadMsg *pRead; - int32_t qtype; - void * pVnode; +static void *dnodeProcessReadQueue(void *wparam) { + SWorker * pWorker = wparam; + SWorkerPool *pPool = pWorker->pPool; + SVReadMsg * pRead; + int32_t qtype; + void * pVnode; while (1) { - if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pRead, &pVnode) == 0) { - dDebug("qset:%p dnode vread got no message from qset, exiting", tsVReadQset); + if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pRead, &pVnode) == 0) { + dDebug("dnode vquery got no message from qset:%p, exiting", pPool->qset); break; } - dTrace("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle, + dTrace("msg:%p, app:%p type:%s will be processed in vquery queue, qtype:%d", pRead, pRead->rpcAhandle, taosMsg[pRead->msgType], qtype); int32_t code = vnodeProcessRead(pVnode, pRead); diff --git a/src/inc/dnode.h b/src/inc/dnode.h index cb7e9f0b0d1267c7879a63ca1cd1a9f15883d996..dd41360e68e1868dc1b947cfc5ab4188e96fc320 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -49,8 +49,10 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); void *dnodeAllocVWriteQueue(void *pVnode); void dnodeFreeVWriteQueue(void *pWqueue); void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code); -void *dnodeAllocVReadQueue(void *pVnode); -void dnodeFreeVReadQueue(void *pRqueue); +void *dnodeAllocVQueryQueue(void *pVnode); +void *dnodeAllocVFetchQueue(void *pVnode); +void dnodeFreeVQueryQueue(void *pQqueue); +void dnodeFreeVFetchQueue(void *pFqueue); int32_t dnodeAllocateMPeerQueue(); void dnodeFreeMPeerQueue(); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index a2d2e2ebbd9009918c2c0758db0daed41d45de61..27d857ce1401ff74ecff72e92354f1ee19d568eb 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -473,7 +473,7 @@ typedef struct { int16_t numOfGroupCols; // num of group by columns int16_t orderByIdx; int16_t orderType; // used in group by xx order by xxx - int64_t tableLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. + int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. int16_t prjOrder; // global order in super table projection query. int64_t limit; int64_t offset; diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 33bb2c278c4693347b1641349de8cd269229e77a..95f1d27b591c2503461826e175c3e768c0db77cc 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -34,6 +34,7 @@ typedef struct { void * rpcHandle; void * rpcAhandle; void * qhandle; + void * pVnode; int8_t qtype; int8_t msgType; SRspRet rspRet; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index b73f7ce3f5cad809c12fcf17e5c57a79811605fd..9b29ad909a8eb81ea32442dd463102f0fb9460b8 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -140,6 +140,11 @@ typedef struct SQueryCostInfo { uint64_t numOfTimeWindows; } SQueryCostInfo; +typedef struct { + int64_t vgroupLimit; + int64_t ts; +} SOrderedPrjQueryInfo; + typedef struct SQuery { int16_t numOfCols; int16_t numOfTags; @@ -167,6 +172,7 @@ typedef struct SQuery { tFilePage** sdata; STableQueryInfo* current; + SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. SSingleColumnFilterInfo* pFilterInfo; } SQuery; @@ -185,7 +191,7 @@ typedef struct SQueryRuntimeEnv { void* pQueryHandle; void* pSecQueryHandle; // another thread for bool stableQuery; // super table query or not - bool topBotQuery; // false + bool topBotQuery; // TODO used bitwise flag bool groupbyNormalCol; // denote if this is a groupby normal column query bool hasTagResults; // if there are tag values in final result or not bool timeWindowInterpo;// if the time window start/end required interpolation @@ -210,14 +216,13 @@ enum { typedef struct SQInfo { void* signature; int32_t code; // error code to returned to client - int64_t owner; // if it is in execution + int64_t owner; // if it is in execution void* tsdb; SMemRef memRef; int32_t vgId; STableGroupInfo tableGroupInfo; // table list SArray STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure SQueryRuntimeEnv runtimeEnv; -// SArray* arrTableIdInfo; SHashObj* arrTableIdInfo; int32_t groupIndex; @@ -233,6 +238,7 @@ typedef struct SQInfo { tsem_t ready; int32_t dataReady; // denote if query result is ready or not void* rspContext; // response context + int64_t startExecTs; // start to exec timestamp } SQInfo; #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index a45c0ac6ef666db3ce17165525b8eeacf1fff6d6..92de3fb84a30ffc194ee6880f22c969b91bc40eb 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -128,11 +128,14 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) +#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) static void setQueryStatus(SQuery *pQuery, int8_t status); static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv); -#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) +static int32_t getMaximumIdleDurationSec() { + return tsShellActivityTimer * 2; +} static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -2138,8 +2141,31 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool); } +static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) { + return pQInfo->rspContext != NULL; +} + #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) +static bool isQueryKilled(SQInfo *pQInfo) { + if (IS_QUERY_KILLED(pQInfo)) { + return true; + } + + // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived + // abort current query execution. + if (pQInfo->owner != 0 && ((taosGetTimestampSec() - pQInfo->startExecTs) > getMaximumIdleDurationSec()) && + (!needBuildResAfterQueryComplete(pQInfo))) { + + assert(pQInfo->startExecTs != 0); + qDebug("QInfo:%p retrieve not arrive beyond %d sec, abort current query execution, start:%"PRId64", current:%d", pQInfo, 1, + pQInfo->startExecTs, taosGetTimestampSec()); + return true; + } + + return false; +} + static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;} static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) { @@ -2864,7 +2890,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { while (tsdbNextDataBlock(pQueryHandle)) { summary->totalBlocks += 1; - if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { + if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -3432,7 +3458,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { int64_t startt = taosGetTimestampMs(); while (1) { - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p it is already killed, abort", pQInfo); tfree(pTableList); @@ -4018,7 +4044,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { cond.twindow.skey, cond.twindow.ekey); // check if query is killed or not - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } } @@ -4675,7 +4701,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; while (tsdbNextDataBlock(pQueryHandle)) { - if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { + if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5112,7 +5138,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { while (tsdbNextDataBlock(pQueryHandle)) { summary->totalBlocks += 1; - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5479,19 +5505,25 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // return; // } + if (pQuery->prjInfo.vgroupLimit != -1) { + assert(pQuery->limit.limit == -1 && pQuery->limit.offset == 0); + } else if (pQuery->limit.limit != -1) { + assert(pQuery->prjInfo.vgroupLimit == -1); + } + bool hasMoreBlock = true; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); SQueryCostInfo *summary = &pRuntimeEnv->summary; while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) { summary->totalBlocks += 1; - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); STableQueryInfo **pTableQueryInfo = - (STableQueryInfo **)taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); + (STableQueryInfo **) taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); if (pTableQueryInfo == NULL) { break; } @@ -5503,6 +5535,25 @@ static void sequentialTableProcess(SQInfo *pQInfo) { setTagVal(pRuntimeEnv, pQuery->current->pTable, pQInfo->tsdb); } + if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->current->windowResInfo.size > pQuery->prjInfo.vgroupLimit) { + pQuery->current->lastKey = + QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; + continue; + } + + // it is a super table ordered projection query, check for the number of output for each vgroup + if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->rec.rows >= pQuery->prjInfo.vgroupLimit) { + if (QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.skey >= pQuery->prjInfo.ts) { + pQuery->current->lastKey = + QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; + continue; + } else if (!QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.ekey <= pQuery->prjInfo.ts) { + pQuery->current->lastKey = + QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; + continue; + } + } + uint32_t status = 0; SDataStatis *pStatis = NULL; SArray *pDataBlock = NULL; @@ -5520,6 +5571,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } ensureOutputBuffer(pRuntimeEnv, &blockInfo); + int64_t prev = getNumOfResult(pRuntimeEnv); + pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); @@ -5530,17 +5583,30 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQuery->rec.rows = getNumOfResult(pRuntimeEnv); + int64_t inc = pQuery->rec.rows - prev; + pQuery->current->windowResInfo.size += (int32_t) inc; + // the flag may be set by tableApplyFunctionsOnBlock, clear it here CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED); updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo); - skipResults(pRuntimeEnv); - // the limitation of output result is reached, set the query completed - if (limitResults(pRuntimeEnv)) { - setQueryStatus(pQuery, QUERY_COMPLETED); - SET_STABLE_QUERY_OVER(pQInfo); - break; + if (pQuery->prjInfo.vgroupLimit >= 0) { + if (((pQuery->rec.rows + pQuery->rec.total) < pQuery->prjInfo.vgroupLimit) || ((pQuery->rec.rows + pQuery->rec.total) > pQuery->prjInfo.vgroupLimit && prev < pQuery->prjInfo.vgroupLimit)) { + if (QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts < blockInfo.window.ekey) { + pQuery->prjInfo.ts = blockInfo.window.ekey; + } else if (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts > blockInfo.window.skey) { + pQuery->prjInfo.ts = blockInfo.window.skey; + } + } + } else { + // the limitation of output result is reached, set the query completed + skipResults(pRuntimeEnv); + if (limitResults(pRuntimeEnv)) { + setQueryStatus(pQuery, QUERY_COMPLETED); + SET_STABLE_QUERY_OVER(pQInfo); + break; + } } // while the output buffer is full or limit/offset is applied, query may be paused here @@ -5582,7 +5648,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { 1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList)); while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) { - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5768,7 +5834,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { qDebug("QInfo:%p master scan completed, elapsed time: %" PRId64 "ms, reverse scan start", pQInfo, el); // query error occurred or query is killed, abort current execution - if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { + if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5789,7 +5855,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { setQueryStatus(pQuery, QUERY_COMPLETED); - if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { + if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); //TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); @@ -5905,7 +5971,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) pQuery->rec.rows = getNumOfResult(pRuntimeEnv); doSecondaryArithmeticProcess(pQuery); - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -6284,7 +6350,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset); pQueryMsg->limit = htobe64(pQueryMsg->limit); pQueryMsg->offset = htobe64(pQueryMsg->offset); - pQueryMsg->tableLimit = htobe64(pQueryMsg->tableLimit); + pQueryMsg->vgroupLimit = htobe64(pQueryMsg->vgroupLimit); pQueryMsg->order = htons(pQueryMsg->order); pQueryMsg->orderColId = htons(pQueryMsg->orderColId); @@ -6885,6 +6951,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQuery->fillType = pQueryMsg->fillType; pQuery->numOfTags = pQueryMsg->numOfTags; pQuery->tagColList = pTagCols; + pQuery->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit; + pQuery->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX; pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); if (pQuery->colList == NULL) { @@ -7479,7 +7547,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) { pthread_mutex_lock(&pQInfo->lock); pQInfo->dataReady = QUERY_RESULT_READY; - buildRes = (pQInfo->rspContext != NULL); + buildRes = needBuildResAfterQueryComplete(pQInfo); // clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is // put into task to be executed. @@ -7488,6 +7556,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) { pthread_mutex_unlock(&pQInfo->lock); + // used in retrieve blocking model. tsem_post(&pQInfo->ready); return buildRes; } @@ -7504,7 +7573,9 @@ bool qTableQuery(qinfo_t qinfo) { return false; } - if (IS_QUERY_KILLED(pQInfo)) { + pQInfo->startExecTs = taosGetTimestampSec(); + + if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p it is already killed, abort", pQInfo); return doBuildResCheck(pQInfo); } @@ -7536,7 +7607,7 @@ bool qTableQuery(qinfo_t qinfo) { } SQuery* pQuery = pRuntimeEnv->pQuery; - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p query is killed", pQInfo); } else if (pQuery->rec.rows == 0) { qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total); @@ -7564,30 +7635,31 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex int32_t code = TSDB_CODE_SUCCESS; -#if _NON_BLOCKING_RETRIEVE - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - - pthread_mutex_lock(&pQInfo->lock); - assert(pQInfo->rspContext == NULL); - - if (pQInfo->dataReady == QUERY_RESULT_READY) { + if (tsHalfCoresForQuery) { + pQInfo->rspContext = pRspContext; + tsem_wait(&pQInfo->ready); *buildRes = true; - qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows, - pQInfo->code); + code = pQInfo->code; } else { - *buildRes = false; - qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo); - pQInfo->rspContext = pRspContext; - assert(pQInfo->rspContext != NULL); - } + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - code = pQInfo->code; - pthread_mutex_unlock(&pQInfo->lock); -#else - tsem_wait(&pQInfo->ready); - *buildRes = true; - code = pQInfo->code; -#endif + pthread_mutex_lock(&pQInfo->lock); + + assert(pQInfo->rspContext == NULL); + if (pQInfo->dataReady == QUERY_RESULT_READY) { + *buildRes = true; + qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%s", pQInfo, pQuery->rowSize, + pQuery->rec.rows, tstrerror(pQInfo->code)); + } else { + *buildRes = false; + qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo); + pQInfo->rspContext = pRspContext; + assert(pQInfo->rspContext != NULL); + } + + code = pQInfo->code; + pthread_mutex_unlock(&pQInfo->lock); + } return code; } @@ -7655,7 +7727,7 @@ int32_t qQueryCompleted(qinfo_t qinfo) { } SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - return IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER); + return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER); } int32_t qKillQuery(qinfo_t qinfo) { @@ -7952,8 +8024,6 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { return NULL; } - const int32_t DEFAULT_QHANDLE_LIFE_SPAN = tsShellActivityTimer * 2 * 1000; - SQueryMgmt *pQueryMgmt = pMgmt; if (pQueryMgmt->qinfoPool == NULL) { qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo); @@ -7969,7 +8039,8 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { return NULL; } else { TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo; - void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_QHANDLE_LIFE_SPAN); + void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), + (getMaximumIdleDurationSec()*1000)); // pthread_mutex_unlock(&pQueryMgmt->lock); return handle; diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c index 3bdc0d477f777faa71a8e5d2264021932dddeea4..51125d62b92e022369d9332b78cb12a8650c75d9 100644 --- a/src/query/src/qPercentile.c +++ b/src/query/src/qPercentile.c @@ -234,7 +234,13 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { } int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) { - double v = GET_DOUBLE_VAL(value); + double v = 0; + if (pBucket->type == TSDB_DATA_TYPE_FLOAT) { + v = GET_FLOAT_VAL(value); + } else { + v = GET_DOUBLE_VAL(value); + } + int32_t index = -1; if (pBucket->range.dMinVal == DBL_MAX) { diff --git a/src/util/inc/tworker.h b/src/util/inc/tworker.h new file mode 100644 index 0000000000000000000000000000000000000000..7bc1eba2fdcca0409737d9305911cd2987afe1ab --- /dev/null +++ b/src/util/inc/tworker.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TWORKER_H +#define TDENGINE_TWORKER_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void *(*FWorkerThread)(void *pWorker); +struct SWorkerPool; + +typedef struct { + pthread_t thread; // thread + int32_t id; // worker ID + struct SWorkerPool *pPool; +} SWorker; + +typedef struct SWorkerPool { + int32_t max; // max number of workers + int32_t min; // min number of workers + int32_t num; // current number of workers + void * qset; + char * name; + SWorker *worker; + FWorkerThread workerFp; + pthread_mutex_t mutex; +} SWorkerPool; + +int32_t tWorkerInit(SWorkerPool *pPool); +void tWorkerCleanup(SWorkerPool *pPool); +void * tWorkerAllocQueue(SWorkerPool *pPool, void *ahandle); +void tWorkerFreeQueue(SWorkerPool *pPool, void *pQueue); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/util/src/tworker.c b/src/util/src/tworker.c new file mode 100644 index 0000000000000000000000000000000000000000..8b4053bccd1ce8d9d3f58328d838f4ba5132a100 --- /dev/null +++ b/src/util/src/tworker.c @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tulog.h" +#include "tqueue.h" +#include "tworker.h" + +int32_t tWorkerInit(SWorkerPool *pPool) { + pPool->qset = taosOpenQset(); + pPool->worker = calloc(sizeof(SWorker), pPool->max); + pthread_mutex_init(&pPool->mutex, NULL); + for (int i = 0; i < pPool->max; ++i) { + SWorker *pWorker = pPool->worker + i; + pWorker->id = i; + pWorker->pPool = pPool; + } + + uInfo("worker:%s is initialized, min:%d max:%d", pPool->name, pPool->min, pPool->max); + return 0; +} + +void tWorkerCleanup(SWorkerPool *pPool) { + for (int i = 0; i < pPool->max; ++i) { + SWorker *pWorker = pPool->worker + i; + if(taosCheckPthreadValid(pWorker->thread)) { + taosQsetThreadResume(pPool->qset); + } + } + + for (int i = 0; i < pPool->max; ++i) { + SWorker *pWorker = pPool->worker + i; + if (taosCheckPthreadValid(pWorker->thread)) { + pthread_join(pWorker->thread, NULL); + } + } + + free(pPool->worker); + taosCloseQset(pPool->qset); + pthread_mutex_destroy(&pPool->mutex); + + uInfo("worker:%s is closed", pPool->name); +} + +void *tWorkerAllocQueue(SWorkerPool *pPool, void *ahandle) { + pthread_mutex_lock(&pPool->mutex); + taos_queue pQueue = taosOpenQueue(); + if (pQueue == NULL) { + pthread_mutex_unlock(&pPool->mutex); + return NULL; + } + + taosAddIntoQset(pPool->qset, pQueue, ahandle); + + // spawn a thread to process queue + if (pPool->num < pPool->max) { + do { + SWorker *pWorker = pPool->worker + pPool->num; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, pPool->workerFp, pWorker) != 0) { + uError("worker:%s:%d failed to create thread to process since %s", pPool->name, pWorker->id, strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + pPool->num++; + uDebug("worker:%s:%d is launched, total:%d", pPool->name, pWorker->id, pPool->num); + } while (pPool->num < pPool->min); + } + + pthread_mutex_unlock(&pPool->mutex); + uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pPool->name, pQueue, ahandle); + + return pQueue; +} + +void tWorkerFreeQueue(SWorkerPool *pPool, void *pQueue) { + taosCloseQueue(pQueue); + uDebug("worker:%s, queue:%p is freed", pPool->name, pQueue); +} diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 401c217b9a972f0929f38b76b88aac508f106723..34f7d64ed11ea509d0a87ab07108d2bd3ee5b318 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -47,8 +47,9 @@ typedef struct { int8_t isCommiting; uint64_t version; // current version uint64_t fversion; // version on saved data file - void * wqueue; - void * rqueue; + void * wqueue; // write queue + void * qqueue; // read query queue + void * fqueue; // read fetch/cancel queue void * wal; void * tsdb; int64_t sync; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 36983c1cf0e4b7bc3f86eb11132cc844f2a7e9be..3a603466f4c02b01624eda462986e917f93917ea 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -212,8 +212,9 @@ int32_t vnodeOpen(int32_t vgId) { pVnode->fversion = pVnode->version; pVnode->wqueue = dnodeAllocVWriteQueue(pVnode); - pVnode->rqueue = dnodeAllocVReadQueue(pVnode); - if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) { + pVnode->qqueue = dnodeAllocVQueryQueue(pVnode); + pVnode->fqueue = dnodeAllocVFetchQueue(pVnode); + if (pVnode->wqueue == NULL || pVnode->qqueue == NULL || pVnode->fqueue == NULL) { vnodeCleanUp(pVnode); return terrno; } @@ -373,9 +374,14 @@ void vnodeDestroy(SVnodeObj *pVnode) { pVnode->wqueue = NULL; } - if (pVnode->rqueue) { - dnodeFreeVReadQueue(pVnode->rqueue); - pVnode->rqueue = NULL; + if (pVnode->qqueue) { + dnodeFreeVQueryQueue(pVnode->qqueue); + pVnode->qqueue = NULL; + } + + if (pVnode->fqueue) { + dnodeFreeVFetchQueue(pVnode->fqueue); + pVnode->fqueue = NULL; } tfree(pVnode->rootDir); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 34921a93b344657889ae868d06fe22864495969e..03d1272771d599e9115896148007c9b1fe310151 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -14,7 +14,6 @@ */ #define _DEFAULT_SOURCE -#define _NON_BLOCKING_RETRIEVE 0 #include "os.h" #include "taosmsg.h" #include "tqueue.h" @@ -25,12 +24,12 @@ static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead); + static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); int32_t vnodeInitRead(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; - return 0; } @@ -115,13 +114,16 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt } pRead->qtype = qtype; - atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->queuedRMsg, 1); - vTrace("vgId:%d, write into vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); - taosWriteQitem(pVnode->rqueue, qtype, pRead); - return TSDB_CODE_SUCCESS; + if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) { + vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); + return taosWriteQitem(pVnode->fqueue, qtype, pRead); + } else { + vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); + return taosWriteQitem(pVnode->qqueue, qtype, pRead); + } } static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) { @@ -197,26 +199,29 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // qHandle needs to be freed correctly if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pRead->pCont; - killQueryMsg->free = htons(killQueryMsg->free); - killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); - - vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pRead->rpcHandle); - assert(pRead->contLen > 0 && killQueryMsg->free == 1); - - void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle); - if (qhandle == NULL || *qhandle == NULL) { - vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle, - pRead->rpcHandle); - } else { - assert(*qhandle == (void *)killQueryMsg->qhandle); - - qKillQuery(*qhandle); - qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true); - } - - return TSDB_CODE_TSC_QUERY_CANCELLED; + vError("error rpc msg in query, %s", tstrerror(pRead->code)); } +// assert(pRead->code != TSDB_CODE_RPC_NETWORK_UNAVAIL); +// if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { +// SCancelQueryMsg *pMsg = (SCancelQueryMsg *)pRead->pCont; +//// pMsg->free = htons(killQueryMsg->free); +// pMsg->qhandle = htobe64(pMsg->qhandle); +// +// vWarn("QInfo:%p connection %p broken, kill query", (void *)pMsg->qhandle, pRead->rpcHandle); +//// assert(pRead->contLen > 0 && pMsg->free == 1); +// +// void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)pMsg->qhandle); +// if (qhandle == NULL || *qhandle == NULL) { +// vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)pMsg->qhandle, pRead->rpcHandle); +// } else { +// assert(*qhandle == (void *)pMsg->qhandle); +// +// qKillQuery(*qhandle); +// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true); +// } +// +// return TSDB_CODE_TSC_QUERY_CANCELLED; +// } int32_t code = TSDB_CODE_SUCCESS; void ** handle = NULL; @@ -338,11 +343,12 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { } if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, invalid handle in retrieving result, code:0x%08x, QInfo:%p", pVnode->vgId, code, (void *)pRetrieve->qhandle); + vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pRetrieve->qhandle); vnodeBuildNoResultQueryRsp(pRet); return code; } - + + // kill current query and free corresponding resources. if (pRetrieve->free == 1) { vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); qKillQuery(*handle); @@ -373,8 +379,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); freeHandle = true; } else { // result is not ready, return immediately - assert(buildRes == true); - // Only effects in the non-blocking model if (!tsHalfCoresForQuery) { if (!buildRes) { @@ -401,12 +405,11 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // notify connection(handle) that current qhandle is created, if current connection from // client is broken, the query needs to be killed immediately. int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { - SRetrieveTableMsg *killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); - killQueryMsg->qhandle = htobe64((uint64_t)qhandle); - killQueryMsg->free = htons(1); - killQueryMsg->header.vgId = htonl(vgId); - killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); + SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); + pMsg->qhandle = htobe64((uint64_t)qhandle); + pMsg->header.vgId = htonl(vgId); + pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); - return rpcReportProgress(handle, (char *)killQueryMsg, sizeof(SRetrieveTableMsg)); + return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg)); } diff --git a/tests/Jenkinsfile b/tests/Jenkinsfile index 49e25a2f5ece27f23f18babfefe48a17cc8f2037..f687cd0699f9407c7665259f801b75659e6c7f24 100644 --- a/tests/Jenkinsfile +++ b/tests/Jenkinsfile @@ -1,6 +1,14 @@ + +// execute this before anything else, including requesting any time on an agent +if (currentBuild.rawBuild.getCauses().toString().contains('BranchIndexingCause')) { + print "INFO: Build skipped due to trigger being Branch Indexing" + currentBuild.result = 'ABORTED' // optional, gives a better hint to the user that it's been skipped, rather than the default which shows it's successful + return +} properties([pipelineTriggers([githubPush()])]) node { - git url: 'https://github.com/liuyq-617/TDengine' + git url: 'https://github.com/taosdata/TDengine.git' + } def pre_test(){ diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index 74a49288e9a1aa7081db45925bf52d6516e4801a..54e81d33b9c7a948dc75996828bc07634d4664ae 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -50,7 +50,7 @@ static void queryDB(TAOS *taos, char *command) { taos_free_result(pSql); } -void Test(char *qstr, const char *input, int i); +void Test(TAOS *taos, char *qstr, int i); int main(int argc, char *argv[]) { char qstr[1024]; @@ -63,21 +63,22 @@ int main(int argc, char *argv[]) { // init TAOS taos_init(); + TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); + if (taos == NULL) { + printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/); + exit(1); + } for (int i = 0; i < 4000000; i++) { - Test(qstr, argv[1], i); + Test(taos, qstr, i); } + taos_close(taos); taos_cleanup(); } -void Test(char *qstr, const char *input, int index) { - TAOS *taos = taos_connect(input, "root", "taosdata", NULL, 0); +void Test(TAOS *taos, char *qstr, int index) { printf("==================test at %d\n================================", index); queryDB(taos, "drop database if exists demo"); queryDB(taos, "create database demo"); TAOS_RES *result; - if (taos == NULL) { - printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/); - exit(1); - } queryDB(taos, "use demo"); queryDB(taos, "create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))"); @@ -131,6 +132,5 @@ void Test(char *qstr, const char *input, int index) { taos_free_result(result); printf("====demo end====\n\n"); - taos_close(taos); }