提交 dd614dd1 编写于 作者: P Ping Xiao

Merge branch 'develop' into xiaoping/add_test_case

......@@ -36,7 +36,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
int32_t tscHandleInsertRetry(SSqlObj* pSql);
int32_t tscHandleInsertRetry(SSqlObj* parent, SSqlObj* child);
void tscBuildResFromSubqueries(SSqlObj *pSql);
TAOS_ROW doSetResultRowData(SSqlObj *pSql);
......
......@@ -110,11 +110,12 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
uint32_t offset);
void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
/**
* for the projection query on metric or point interpolation query on metric,
......@@ -275,6 +276,8 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
bool hasMoreClauseToTry(SSqlObj* pSql);
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
......
......@@ -37,40 +37,6 @@ extern "C" {
#include "qTsbuf.h"
#include "tcmdtype.h"
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = rand();
if (v % 5000 <= 0) {
return NULL;
} else {
return malloc(__size);
}
}
static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
uint32_t v = rand();
if (v % 5000 <= 0) {
return NULL;
} else {
return calloc(num, __size);
}
}
static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
uint32_t v = rand();
if (v % 5000 <= 0) {
return NULL;
} else {
return realloc(p, __size);
}
}
#define calloc u_calloc
#define malloc u_malloc
#define realloc u_realloc
#endif
// forward declaration
struct SSqlInfo;
struct SLocalReducer;
......@@ -78,7 +44,7 @@ struct SLocalReducer;
// data source from sql string or from file
enum {
DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2,
DATA_FROM_DATA_FILE = 2,
};
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
......@@ -118,10 +84,10 @@ typedef struct STableMetaInfo {
* 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion
*/
int32_t vgroupIndex;
char name[TSDB_TABLE_FNAME_LEN]; // (super) table name
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray* tagColList; // SArray<SColumn*>, involved tag columns
int32_t vgroupIndex;
char name[TSDB_TABLE_FNAME_LEN]; // (super) table name
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
/* the structure for sql function in select clause */
......@@ -136,7 +102,7 @@ typedef struct SSqlExpr {
int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id
int16_t resColId; // result column id
} SSqlExpr;
typedef struct SColumnIndex {
......@@ -204,22 +170,17 @@ typedef struct SParamInfo {
} SParamInfo;
typedef struct STableDataBlocks {
char tableId[TSDB_TABLE_FNAME_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block
int32_t rowSize; // row size for current table
uint32_t nAllocSize;
uint32_t headerSize; // header for table info (uid, tid, submit metadata)
uint32_t size;
/*
* the table meta of table, the table meta will be used during submit, keep a ref
* to avoid it to be removed from cache
*/
STableMeta *pTableMeta;
char tableId[TSDB_TABLE_FNAME_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block
int32_t rowSize; // row size for current table
uint32_t nAllocSize;
uint32_t headerSize; // header for table info (uid, tid, submit metadata)
uint32_t size;
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char *pData;
// for parameter ('?') binding
......@@ -252,7 +213,7 @@ typedef struct SQueryInfo {
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t tableLimit; // table limit in case of super table projection query + global order + limit
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id
......@@ -284,10 +245,14 @@ typedef struct {
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
int8_t submitSchema; // submit block is built with table schema
STagData *pTagData; // NOTE: pTagData->data is used as a variant length array
SHashObj *pTableList; // referred table involved in sql
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql
int8_t submitSchema; // submit block is built with table schema
STagData *pTagData; // NOTE: pTagData->data is used as a variant length array
STableMeta **pTableMetaList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables;
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
} SSqlCmd;
typedef struct SResRec {
......
......@@ -410,52 +410,26 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code != TSDB_CODE_SUCCESS) {
tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code));
goto _error;
} else {
tscDebug("%p get %s successfully", pSql, msg);
}
tscDebug("%p get %s successfully", pSql, msg);
if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql);
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("%p update table meta in local cache, continue to process sql and send the corresponding query", pSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
}
// param already freed by other routine and pSql in tscCache when ctrl + c
if (atomic_load_ptr(&pSql->param) == NULL) {
return;
}
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSql;
// NOTE: the vgroupInfo for the queried super table must be existed here.
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
// tscProcessSql can add error into async res
tscProcessSql(pSql);
return;
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding tid_tag query", pSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
}
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
// tscProcessSql can add error into async res
tscProcessSql(pSql);
return;
......@@ -465,16 +439,18 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
}
assert(pCmd->command != TSDB_SQL_INSERT);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
// 2. vnode may need the schema information along with submit block to update its local table schema.
if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) {
if (pCmd->command == TSDB_SQL_SELECT) {
tscDebug("%p redo parse sql string and proceed", pSql);
pCmd->parseFinished = false;
tscResetSqlCmdObj(pCmd, false);
......@@ -486,16 +462,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error;
}
if (pCmd->command == TSDB_SQL_INSERT) {
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.
*/
pSql->fp = pSql->fetchFp; // restore the fp
tscHandleInsertRetry(pSql);
} else if (pCmd->command == TSDB_SQL_SELECT) { // in case of other query type, continue
tscProcessSql(pSql);
}
tscProcessSql(pSql);
}else { // in all other cases, simple retry
tscProcessSql(pSql);
}
......@@ -551,6 +519,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (!pSql->cmd.parseFinished) {
tsParseSql(pSql, false);
}
(*pSql->fp)(pSql->param, pSql, code);
return;
......
......@@ -2589,10 +2589,11 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) {
// all data are null, set it completed
if (pInfo->numOfElems == 0) {
pResInfo->complete = true;
} else {
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, GET_DOUBLE_VAL(&pInfo->minval), GET_DOUBLE_VAL(&pInfo->maxval));
}
pInfo->stage += 1;
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, GET_DOUBLE_VAL(&pInfo->minval), GET_DOUBLE_VAL(&pInfo->maxval));
} else {
pResInfo->complete = true;
}
......
......@@ -726,10 +726,14 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
SSchema p1 = {0};
if (pExpr->colInfo.colIndex != TSDB_TBNAME_COLUMN_INDEX) {
p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex);
} else {
if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) {
p1 = tGetTableNameColumnSchema();
} else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) {
p1.bytes = pExpr->resBytes;
p1.type = (uint8_t) pExpr->resType;
tstrncpy(p1.name, pExpr->aliasName, tListLen(p1.name));
} else {
p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex);
}
int32_t inter = 0;
......
......@@ -686,17 +686,14 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
}
}
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **str, SParsedDataColInfo *spd,
int32_t *totalNum) {
SSqlCmd * pCmd = &pSql->cmd;
static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColInfo *spd, int32_t *totalNum) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name,
pTableMeta, &dataBuf);
int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &dataBuf, NULL);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -1058,18 +1055,17 @@ int tsParseInsertSql(SSqlObj *pSql) {
return code;
}
if (NULL == pCmd->pTableList) {
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES);
if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
if (NULL == pCmd->pTableBlockHashList) {
pCmd->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (NULL == pCmd->pTableBlockHashList) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
goto _clean;
}
} else {
str = pCmd->curSql;
}
tscDebug("%p create data block list for submit data:%p, pTableList:%p", pSql, pCmd->pDataBlocks, pCmd->pTableList);
tscDebug("%p create data block list hashList:%p", pSql, pCmd->pTableBlockHashList);
while (1) {
int32_t index = 0;
......@@ -1091,7 +1087,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
*/
if (totalNum == 0) {
code = TSDB_CODE_TSC_INVALID_SQL;
goto _error;
goto _clean;
} else {
break;
}
......@@ -1104,11 +1100,11 @@ int tsParseInsertSql(SSqlObj *pSql) {
// Check if the table name available or not
if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) {
code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
goto _error;
goto _clean;
}
if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
goto _error;
goto _clean;
}
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
......@@ -1122,12 +1118,12 @@ int tsParseInsertSql(SSqlObj *pSql) {
tscError("%p async insert parse error, code:%s", pSql, tstrerror(code));
pCmd->curSql = NULL;
goto _error;
goto _clean;
}
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
goto _error;
goto _clean;
}
index = 0;
......@@ -1136,7 +1132,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
goto _error;
goto _clean;
}
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -1148,32 +1144,32 @@ int tsParseInsertSql(SSqlObj *pSql) {
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error;
goto _clean;
}
/*
* app here insert data in different vnodes, so we need to set the following
* data in another submit procedure using async insert routines
*/
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
code = doParseInsertStatement(pCmd, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
goto _clean;
}
} else if (sToken.type == TK_FILE) {
if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error;
goto _clean;
}
index = 0;
sToken = tStrGetToken(str, &index, false, 0, NULL);
if (sToken.type != TK_STRING && sToken.type != TK_ID) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _error;
goto _clean;
}
str += index;
if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _error;
goto _clean;
}
strncpy(pCmd->payload, sToken.z, sToken.n);
......@@ -1183,7 +1179,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
wordexp_t full_path;
if (wordexp(pCmd->payload, &full_path, 0) != 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
goto _error;
goto _clean;
}
tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize);
......@@ -1195,7 +1191,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
SSchema * pSchema = tscGetTableSchema(pTableMeta);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error;
goto _clean;
}
SParsedDataColInfo spd = {0};
......@@ -1230,7 +1226,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (spd.hasVal[t] == true) {
code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
goto _error;
goto _clean;
}
spd.hasVal[t] = true;
......@@ -1241,13 +1237,13 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (!findColumnIndex) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
goto _error;
goto _clean;
}
}
if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) {
code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
goto _error;
goto _clean;
}
index = 0;
......@@ -1256,16 +1252,16 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (sToken.type != TK_VALUES) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
goto _error;
goto _clean;
}
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
code = doParseInsertStatement(pCmd, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
goto _clean;
}
} else {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
goto _error;
goto _clean;
}
}
......@@ -1274,25 +1270,18 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _clean;
}
if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
goto _error;
if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) {
goto _clean;
}
}
code = TSDB_CODE_SUCCESS;
goto _clean;
_error:
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
_clean:
taosHashCleanup(pCmd->pTableList);
pCmd->pTableList = NULL;
pCmd->curSql = NULL;
pCmd->curSql = NULL;
pCmd->parseFinished = 1;
return code;
}
......@@ -1373,6 +1362,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
pSql->parseRetry++;
ret = tscToSQLCmd(pSql, &SQLInfo);
}
SQLInfoDestroy(&SQLInfo);
}
......@@ -1399,7 +1389,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL);
}
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -800,9 +800,9 @@ static int insertStmtExecute(STscStmt* stmt) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1);
if (taosArrayGetSize(pCmd->pDataBlocks) > 0) {
if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) {
// merge according to vgid
int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks);
int code = tscMergeTableDataBlocks(stmt->pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -1310,7 +1310,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
SColumnIndex index = {.tableIndex = tableIndex};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, sizeof(double),
-1000, sizeof(double), false);
getNewResColId(pQueryInfo), sizeof(double), false);
char* name = (pItem->aliasName != NULL)? pItem->aliasName:pItem->pNode->token.z;
size_t len = MIN(sizeof(pExpr->aliasName), pItem->pNode->token.n + 1);
......@@ -5317,7 +5317,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
// keep original limitation value in globalLimit
pQueryInfo->clauseLimit = pQueryInfo->limit.limit;
pQueryInfo->prjOffset = pQueryInfo->limit.offset;
pQueryInfo->tableLimit = -1;
pQueryInfo->vgroupLimit = -1;
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
/*
......@@ -5327,7 +5327,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
* than or equal to the value of limit.
*/
if (pQueryInfo->limit.limit > 0) {
pQueryInfo->tableLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset;
pQueryInfo->vgroupLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset;
pQueryInfo->limit.limit = -1;
}
......
......@@ -280,19 +280,19 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
}
int32_t cmd = pCmd->command;
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.submitSchema = 1;
}
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.submitSchema = 1;
}
pSql->res.code = rpcMsg->code; // keep the previous error code
if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
......@@ -451,10 +451,10 @@ int tscProcessSql(SSqlObj *pSql) {
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
pRetrieveMsg->free = htons(pQueryInfo->type);
pRetrieveMsg->free = htons(pQueryInfo->type);
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
// todo valid the vgroupId at the client side
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -681,7 +681,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->queryType = htonl(pQueryInfo->type);
pQueryMsg->tableLimit = htobe64(pQueryInfo->tableLimit);
pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number
......@@ -1394,6 +1394,43 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload;
// pCancelMsg->qhandle = htobe64(pSql->res.qhandle);
//
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
//
// if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// int32_t vgIndex = pTableMetaInfo->vgroupIndex;
// if (pTableMetaInfo->pVgroupTables == NULL) {
// SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
// assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
//
// pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
// } else {
// int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
// assert(vgIndex >= 0 && vgIndex < numOfVgroups);
//
// SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
//
// pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
// }
// } else {
// STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
// pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
// tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
// }
//
// pSql->cmd.payloadLen = sizeof(SCancelQueryMsg);
// pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY;
//
// pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
// return TSDB_CODE_SUCCESS;
//}
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SAlterDbMsg);
......
......@@ -900,9 +900,9 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
strtolower(pSql->sqlstr, sql);
pCmd->curSql = NULL;
if (NULL != pCmd->pTableList) {
taosHashCleanup(pCmd->pTableList);
pCmd->pTableList = NULL;
if (NULL != pCmd->pTableBlockHashList) {
taosHashCleanup(pCmd->pTableBlockHashList);
pCmd->pTableBlockHashList = NULL;
}
pSql->fp = asyncCallback;
......
......@@ -2149,6 +2149,29 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
}
}
static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
if (pParentObj->retry > pParentObj->maxRetry) {
tscError("%p max retry reached, abort the retry effort", pParentObj)
return false;
}
for (int32_t i = 0; i < numOfSub; ++i) {
int32_t code = pParentObj->pSubs[i]->res.code;
if (code == TSDB_CODE_SUCCESS) {
continue;
}
if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE && code != TSDB_CODE_TDB_INVALID_TABLE_ID &&
code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
code != TSDB_CODE_APP_NOT_READY) {
pParentObj->res.code = code;
return false;
}
}
return true;
}
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql;
......@@ -2163,23 +2186,80 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
assert(pSql != NULL && pSql->res.code == numOfRows);
pParentObj->res.code = pSql->res.code;
}
tfree(pSupporter);
// set the flag in the parent sqlObj
if (pSql->cmd.submitSchema) {
pParentObj->cmd.submitSchema = 1;
}
}
if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) {
return;
}
tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
// restore user defined fp
pParentObj->fp = pParentObj->fetchFp;
int32_t numOfSub = pParentObj->subState.numOfSub;
if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
for(int32_t i = 0; i < numOfSub; ++i) {
SSqlObj* pSql = pParentObj->pSubs[i];
tfree(pSql->param);
}
// todo remove this parameter in async callback function definition.
// all data has been sent to vnode, call user function
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
} else {
if (!needRetryInsert(pParentObj, numOfSub)) {
tscQueueAsyncRes(pParentObj);
return;
}
int32_t numOfFailed = 0;
for(int32_t i = 0; i < numOfSub; ++i) {
SSqlObj* pSql = pParentObj->pSubs[i];
if (pSql->res.code != TSDB_CODE_SUCCESS) {
numOfFailed += 1;
// clean up tableMeta in cache
tscFreeQueryInfo(&pSql->cmd, true);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
// todo remove this parameter in async callback function definition.
// all data has been sent to vnode, call user function
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql);
}
}
tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj,
pParentObj->res.numOfRows, numOfFailed, numOfSub);
tscDebug("%p cleanup %d tableMeta in cache", pParentObj, pParentObj->cmd.numOfTables);
for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) {
taosCacheRelease(tscMetaCache, (void**)&(pParentObj->cmd.pTableMetaList[i]), true);
}
pParentObj->cmd.parseFinished = false;
pParentObj->subState.numOfRemain = numOfFailed;
pParentObj->subState.numOfSub = numOfFailed;
tscResetSqlCmdObj(&pParentObj->cmd, false);
tscDebug("%p re-parse sql to generate submit data, retry:%d", pParentObj, pParentObj->retry++);
int32_t code = tsParseSql(pParentObj, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) {
pParentObj->res.code = code;
tscQueueAsyncRes(pParentObj);
return;
}
tscDoQuery(pParentObj);
}
}
/**
......@@ -2187,19 +2267,19 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
* @param pSql
* @return
*/
int32_t tscHandleInsertRetry(SSqlObj* pSql) {
int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
assert(pSql != NULL && pSql->param != NULL);
SSqlCmd* pCmd = &pSql->cmd;
// SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
assert(pSupporter->index < pSupporter->pSql->subState.numOfSub);
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index);
STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index);
int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
// free the data block created from insert sql string
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
// pCmd->pDataBlocks = tscDestroyBlockArrayList(pParent->cmd.pDataBlocks);
if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
......@@ -2213,6 +2293,20 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
// it is the failure retry insert
if (pSql->pSubs != NULL) {
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, i);
if (pSub->res.code != TSDB_CODE_SUCCESS) {
tscHandleInsertRetry(pSql, pSub);
}
}
return TSDB_CODE_SUCCESS;
}
pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks);
assert(pSql->subState.numOfSub > 0);
......
......@@ -333,13 +333,15 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (isNull(p, TSDB_DATA_TYPE_NCHAR)) {
memcpy(dst, p, varDataTLen(p));
} else {
} else if (varDataLen(p) > 0) {
int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst));
varDataSetLen(dst, length);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
} else {
varDataSetLen(dst, 0);
}
p += pInfo->field.bytes;
......@@ -377,7 +379,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
}
static void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) {
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) {
if (pCmd == NULL || pCmd->numOfClause == 0) {
return;
}
......@@ -403,12 +405,12 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
pCmd->msgType = 0;
pCmd->parseFinished = 0;
pCmd->autoCreated = 0;
taosHashCleanup(pCmd->pTableList);
pCmd->pTableList = NULL;
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
pCmd->numOfTables = 0;
tfree(pCmd->pTableMetaList);
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeQueryInfo(pCmd, removeFromCache);
}
......@@ -575,6 +577,21 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
return NULL;
}
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable) {
if (pBlockHashTable == NULL) {
return NULL;
}
STableDataBlocks** p = taosHashIterate(pBlockHashTable, NULL);
while(p) {
tscDestroyDataBlock(*p);
p = taosHashIterate(pBlockHashTable, p);
}
taosHashCleanup(pBlockHashTable);
return NULL;
}
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
SSqlCmd* pCmd = &pSql->cmd;
assert(pDataBlock->pTableMeta != NULL);
......@@ -671,9 +688,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_SUCCESS;
}
int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks) {
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList) {
*dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
......@@ -688,7 +704,9 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t
}
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
taosArrayPush(pDataBlockList, dataBlocks);
if (pBlockList) {
taosArrayPush(pBlockList, dataBlocks);
}
}
return TSDB_CODE_SUCCESS;
......@@ -769,22 +787,37 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result;
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
static void extractTableMeta(SSqlCmd* pCmd) {
pCmd->numOfTables = (int32_t) taosHashGetSize(pCmd->pTableBlockHashList);
pCmd->pTableMetaList = calloc(pCmd->numOfTables, POINTER_BYTES);
STableDataBlocks **p1 = taosHashIterate(pCmd->pTableBlockHashList, NULL);
int32_t i = 0;
while(p1) {
STableDataBlocks* pBlocks = *p1;
pCmd->pTableMetaList[i++] = taosCacheTransfer(tscMetaCache, (void**) &pBlocks->pTableMeta);
p1 = taosHashIterate(pCmd->pTableBlockHashList, p1);
}
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList);
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
size_t total = taosArrayGetSize(pTableDataBlockList);
for (int32_t i = 0; i < total; ++i) {
STableDataBlocks** p = taosHashIterate(pCmd->pTableBlockHashList, NULL);
STableDataBlocks* pOneTableBlock = *p;
while(pOneTableBlock) {
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
STableDataBlocks* dataBuf = NULL;
int32_t ret =
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf);
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
taosHashCleanup(pVnodeDataBlockHashList);
......@@ -839,14 +872,19 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
// the length does not include the SSubmitBlk structure
pBlocks->dataLen = htonl(finalLen);
dataBuf->numOfTables += 1;
p = taosHashIterate(pCmd->pTableBlockHashList, p);
if (p == NULL) {
break;
}
pOneTableBlock = *p;
}
tscDestroyBlockArrayList(pTableDataBlockList);
extractTableMeta(pCmd);
// free the table data blocks;
pCmd->pDataBlocks = pVnodeDataBlockList;
// tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_SUCCESS;
......@@ -2023,6 +2061,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNewQueryInfo->limit = pQueryInfo->limit;
pNewQueryInfo->slimit = pQueryInfo->slimit;
pNewQueryInfo->order = pQueryInfo->order;
pNewQueryInfo->vgroupLimit = pQueryInfo->vgroupLimit;
pNewQueryInfo->tsBuf = NULL;
pNewQueryInfo->fillType = pQueryInfo->fillType;
pNewQueryInfo->fillVal = NULL;
......
......@@ -36,7 +36,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" )
// the SQL below is for mgmt node
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" )
......
......@@ -203,10 +203,10 @@ int32_t tsVersion = 0;
// log
int32_t tsNumOfLogLines = 10000000;
int32_t mDebugFlag = 135;
int32_t sdbDebugFlag = 135;
int32_t mDebugFlag = 131;
int32_t sdbDebugFlag = 131;
int32_t dDebugFlag = 135;
int32_t vDebugFlag = 135;
int32_t vDebugFlag = 131;
int32_t cDebugFlag = 131;
int32_t jniDebugFlag = 131;
int32_t odbcDebugFlag = 131;
......@@ -220,7 +220,7 @@ int32_t debugFlag = 0;
int32_t sDebugFlag = 135;
int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 135;
int32_t cqDebugFlag = 131;
int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL;
......@@ -416,7 +416,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "arbitrator";
cfg.ptr = tsArbitrator;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_EP_LEN;
......@@ -901,7 +901,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "timezone";
cfg.ptr = tsTimezone;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(tsTimezone);
......@@ -911,7 +911,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "locale";
cfg.ptr = tsLocale;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(tsLocale);
......@@ -921,7 +921,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "charset";
cfg.ptr = tsCharset;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(tsCharset);
......
......@@ -24,8 +24,10 @@ extern "C" {
int32_t dnodeInitVRead();
void dnodeCleanupVRead();
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg);
void * dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *pRqueue);
void * dnodeAllocVQueryQueue(void *pVnode);
void * dnodeAllocVFetchQueue(void *pVnode);
void dnodeFreeVQueryQueue(void *pQqueue);
void dnodeFreeVFetchQueue(void *pFqueue);
#ifdef __cplusplus
}
......
......@@ -121,7 +121,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
dnodeSendRedirectMsg(pMsg, true);
} else {
SMnodeMsg *pWrite = mnodeCreateMsg(pMsg);
dDebug("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
dTrace("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
......@@ -130,7 +130,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
}
static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) {
dDebug("msg:%p, app:%p type:%s is freed from mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
dTrace("msg:%p, app:%p type:%s is freed from mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
mnodeCleanupMsg(pWrite);
......
......@@ -96,7 +96,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
rspMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
dDebug("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
dTrace("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
return;
}
......@@ -151,7 +151,7 @@ void dnodeCleanupClient() {
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (dnodeGetRunStatus() == TSDB_RUN_STATUS_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dDebug("msg:%p is ignored since dnode is stopping", pMsg);
dTrace("msg:%p is ignored since dnode is stopping", pMsg);
rpcFreeCont(pMsg->pCont);
return;
}
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tqueue.h"
#include "tworker.h"
#include "dnodeVMgmt.h"
typedef struct {
......@@ -23,9 +24,8 @@ typedef struct {
char pCont[];
} SMgmtMsg;
static taos_qset tsMgmtQset = NULL;
static taos_queue tsMgmtQueue = NULL;
static pthread_t tsQthread;
static SWorkerPool tsVMgmtWP;
static taos_queue tsVMgmtQueue = NULL;
static void * dnodeProcessMgmtQueue(void *param);
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
......@@ -47,45 +47,23 @@ int32_t dnodeInitVMgmt() {
int32_t code = vnodeInitMgmt();
if (code != TSDB_CODE_SUCCESS) return -1;
tsMgmtQset = taosOpenQset();
if (tsMgmtQset == NULL) {
dError("failed to create the vmgmt queue set");
return -1;
}
tsMgmtQueue = taosOpenQueue();
if (tsMgmtQueue == NULL) {
dError("failed to create the vmgmt queue");
return -1;
}
taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL);
tsVMgmtWP.name = "vmgmt";
tsVMgmtWP.workerFp = dnodeProcessMgmtQueue;
tsVMgmtWP.min = 1;
tsVMgmtWP.max = 1;
if (tWorkerInit(&tsVMgmtWP) != 0) return -1;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
pthread_attr_destroy(&thAttr);
if (code != 0) {
dError("failed to create thread to process vmgmt queue, reason:%s", strerror(errno));
return -1;
}
tsVMgmtQueue = tWorkerAllocQueue(&tsVMgmtWP, NULL);
dInfo("dnode vmgmt is initialized");
return TSDB_CODE_SUCCESS;
}
void dnodeCleanupVMgmt() {
if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset);
if (tsQthread) pthread_join(tsQthread, NULL);
if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue);
if (tsMgmtQset) taosCloseQset(tsMgmtQset);
tsMgmtQset = NULL;
tsMgmtQueue = NULL;
tWorkerFreeQueue(&tsVMgmtWP, tsVMgmtQueue);
tWorkerCleanup(&tsVMgmtWP);
tsVMgmtQueue = NULL;
vnodeCleanupMgmt();
}
......@@ -97,7 +75,7 @@ static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
pMgmt->rpcMsg = *pMsg;
pMgmt->rpcMsg.pCont = pMgmt->pCont;
memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen);
taosWriteQitem(tsMgmtQueue, TAOS_QTYPE_RPC, pMgmt);
taosWriteQitem(tsVMgmtQueue, TAOS_QTYPE_RPC, pMgmt);
return TSDB_CODE_SUCCESS;
}
......@@ -112,28 +90,30 @@ void dnodeDispatchToVMgmtQueue(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont);
}
static void *dnodeProcessMgmtQueue(void *param) {
SMgmtMsg *pMgmt;
SRpcMsg * pMsg;
SRpcMsg rsp = {0};
int32_t qtype;
void * handle;
static void *dnodeProcessMgmtQueue(void *wparam) {
SWorker * pWorker = wparam;
SWorkerPool *pPool = pWorker->pPool;
SMgmtMsg * pMgmt;
SRpcMsg * pMsg;
SRpcMsg rsp = {0};
int32_t qtype;
void * handle;
while (1) {
if (taosReadQitemFromQset(tsMgmtQset, &qtype, (void **)&pMgmt, &handle) == 0) {
dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset);
if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pMgmt, &handle) == 0) {
dDebug("qdnode mgmt got no message from qset:%p, , exit", pPool->qset);
break;
}
pMsg = &pMgmt->rpcMsg;
dDebug("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[pMsg->msgType]);
dTrace("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[pMsg->msgType]);
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else {
rsp.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
}
dDebug("msg:%p, is processed, code:0x%x", pMgmt, rsp.code);
dTrace("msg:%p, is processed, code:0x%x", pMgmt, rsp.code);
if (rsp.code != TSDB_CODE_DND_ACTION_IN_PROGRESS) {
rsp.handle = pMsg->handle;
rsp.pCont = NULL;
......
......@@ -16,66 +16,35 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tqueue.h"
#include "tworker.h"
#include "dnodeVRead.h"
typedef struct {
pthread_t thread; // thread
int32_t workerId; // worker ID
} SVReadWorker;
typedef struct {
int32_t max; // max number of workers
int32_t min; // min number of workers
int32_t num; // current number of workers
SVReadWorker * worker;
pthread_mutex_t mutex;
} SVReadWorkerPool;
static void *dnodeProcessReadQueue(void *pWorker);
// module global variable
static SVReadWorkerPool tsVReadWP;
static taos_qset tsVReadQset;
static SWorkerPool tsVQueryWP;
static SWorkerPool tsVFetchWP;
int32_t dnodeInitVRead() {
tsVReadQset = taosOpenQset();
tsVReadWP.min = tsNumOfCores;
tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min;
tsVReadWP.worker = calloc(sizeof(SVReadWorker), tsVReadWP.max);
pthread_mutex_init(&tsVReadWP.mutex, NULL);
if (tsVReadWP.worker == NULL) return -1;
for (int i = 0; i < tsVReadWP.max; ++i) {
SVReadWorker *pWorker = tsVReadWP.worker + i;
pWorker->workerId = i;
}
tsVQueryWP.name = "vquery";
tsVQueryWP.workerFp = dnodeProcessReadQueue;
tsVQueryWP.min = tsNumOfCores;
tsVQueryWP.max = tsNumOfCores/* * tsNumOfThreadsPerCore*/;
// if (tsVQueryWP.max <= tsVQueryWP.min * 2) tsVQueryWP.max = 2 * tsVQueryWP.min;
if (tWorkerInit(&tsVQueryWP) != 0) return -1;
tsVFetchWP.name = "vfetch";
tsVFetchWP.workerFp = dnodeProcessReadQueue;
tsVFetchWP.min = MIN(4, tsNumOfCores);
tsVFetchWP.max = tsVFetchWP.min;
if (tWorkerInit(&tsVFetchWP) != 0) return -1;
dInfo("dnode vread is initialized, min worker:%d max worker:%d", tsVReadWP.min, tsVReadWP.max);
return 0;
}
void dnodeCleanupVRead() {
for (int i = 0; i < tsVReadWP.max; ++i) {
SVReadWorker *pWorker = tsVReadWP.worker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsVReadQset);
}
}
for (int i = 0; i < tsVReadWP.max; ++i) {
SVReadWorker *pWorker = tsVReadWP.worker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
}
free(tsVReadWP.worker);
taosCloseQset(tsVReadQset);
pthread_mutex_destroy(&tsVReadWP.mutex);
dInfo("dnode vread is closed");
tWorkerCleanup(&tsVFetchWP);
tWorkerCleanup(&tsVQueryWP);
}
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
......@@ -88,6 +57,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
assert(pHead->contLen > 0);
void *pVnode = vnodeAcquire(pHead->vgId);
if (pVnode != NULL) {
int32_t code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg);
......@@ -107,43 +77,20 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont);
}
void *dnodeAllocVReadQueue(void *pVnode) {
pthread_mutex_lock(&tsVReadWP.mutex);
taos_queue queue = taosOpenQueue();
if (queue == NULL) {
pthread_mutex_unlock(&tsVReadWP.mutex);
return NULL;
}
taosAddIntoQset(tsVReadQset, queue, pVnode);
// spawn a thread to process queue
if (tsVReadWP.num < tsVReadWP.max) {
do {
SVReadWorker *pWorker = tsVReadWP.worker + tsVReadWP.num;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) {
dError("failed to create thread to process vread vqueue since %s", strerror(errno));
}
pthread_attr_destroy(&thAttr);
tsVReadWP.num++;
dDebug("dnode vread worker:%d is launched, total:%d", pWorker->workerId, tsVReadWP.num);
} while (tsVReadWP.num < tsVReadWP.min);
}
void *dnodeAllocVQueryQueue(void *pVnode) {
return tWorkerAllocQueue(&tsVQueryWP, pVnode);
}
pthread_mutex_unlock(&tsVReadWP.mutex);
dDebug("pVnode:%p, dnode vread queue:%p is allocated", pVnode, queue);
void *dnodeAllocVFetchQueue(void *pVnode) {
return tWorkerAllocQueue(&tsVFetchWP, pVnode);
}
return queue;
void dnodeFreeVQueryQueue(void *pQqueue) {
tWorkerFreeQueue(&tsVQueryWP, pQqueue);
}
void dnodeFreeVReadQueue(void *pRqueue) {
taosCloseQueue(pRqueue);
void dnodeFreeVFetchQueue(void *pFqueue) {
tWorkerFreeQueue(&tsVFetchWP, pFqueue);
}
void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
......@@ -160,18 +107,20 @@ void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) {
}
static void *dnodeProcessReadQueue(void *pWorker) {
SVReadMsg *pRead;
int32_t qtype;
void * pVnode;
static void *dnodeProcessReadQueue(void *wparam) {
SWorker * pWorker = wparam;
SWorkerPool *pPool = pWorker->pPool;
SVReadMsg * pRead;
int32_t qtype;
void * pVnode;
while (1) {
if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pRead, &pVnode) == 0) {
dDebug("qset:%p dnode vread got no message from qset, exiting", tsVReadQset);
if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pRead, &pVnode) == 0) {
dDebug("dnode vquery got no message from qset:%p, exiting", pPool->qset);
break;
}
dTrace("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle,
dTrace("msg:%p, app:%p type:%s will be processed in vquery queue, qtype:%d", pRead, pRead->rpcAhandle,
taosMsg[pRead->msgType], qtype);
int32_t code = vnodeProcessRead(pVnode, pRead);
......
......@@ -49,8 +49,10 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *pRqueue);
void *dnodeAllocVQueryQueue(void *pVnode);
void *dnodeAllocVFetchQueue(void *pVnode);
void dnodeFreeVQueryQueue(void *pQqueue);
void dnodeFreeVFetchQueue(void *pFqueue);
int32_t dnodeAllocateMPeerQueue();
void dnodeFreeMPeerQueue();
......
......@@ -473,7 +473,7 @@ typedef struct {
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
int64_t tableLimit; // limit the number of rows for each table, used in order by + limit in stable projection query.
int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query.
int16_t prjOrder; // global order in super table projection query.
int64_t limit;
int64_t offset;
......
......@@ -34,6 +34,7 @@ typedef struct {
void * rpcHandle;
void * rpcAhandle;
void * qhandle;
void * pVnode;
int8_t qtype;
int8_t msgType;
SRspRet rspRet;
......
......@@ -140,6 +140,11 @@ typedef struct SQueryCostInfo {
uint64_t numOfTimeWindows;
} SQueryCostInfo;
typedef struct {
int64_t vgroupLimit;
int64_t ts;
} SOrderedPrjQueryInfo;
typedef struct SQuery {
int16_t numOfCols;
int16_t numOfTags;
......@@ -167,6 +172,7 @@ typedef struct SQuery {
tFilePage** sdata;
STableQueryInfo* current;
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo* pFilterInfo;
} SQuery;
......@@ -185,7 +191,7 @@ typedef struct SQueryRuntimeEnv {
void* pQueryHandle;
void* pSecQueryHandle; // another thread for
bool stableQuery; // super table query or not
bool topBotQuery; // false
bool topBotQuery; // TODO used bitwise flag
bool groupbyNormalCol; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
......@@ -210,14 +216,13 @@ enum {
typedef struct SQInfo {
void* signature;
int32_t code; // error code to returned to client
int64_t owner; // if it is in execution
int64_t owner; // if it is in execution
void* tsdb;
SMemRef memRef;
int32_t vgId;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv runtimeEnv;
// SArray* arrTableIdInfo;
SHashObj* arrTableIdInfo;
int32_t groupIndex;
......@@ -233,6 +238,7 @@ typedef struct SQInfo {
tsem_t ready;
int32_t dataReady; // denote if query result is ready or not
void* rspContext; // response context
int64_t startExecTs; // start to exec timestamp
} SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H
......@@ -128,11 +128,14 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
static void setQueryStatus(SQuery *pQuery, int8_t status);
static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
static int32_t getMaximumIdleDurationSec() {
return tsShellActivityTimer * 2;
}
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
......@@ -2138,8 +2141,31 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
}
static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
return pQInfo->rspContext != NULL;
}
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
static bool isQueryKilled(SQInfo *pQInfo) {
if (IS_QUERY_KILLED(pQInfo)) {
return true;
}
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// abort current query execution.
if (pQInfo->owner != 0 && ((taosGetTimestampSec() - pQInfo->startExecTs) > getMaximumIdleDurationSec()) &&
(!needBuildResAfterQueryComplete(pQInfo))) {
assert(pQInfo->startExecTs != 0);
qDebug("QInfo:%p retrieve not arrive beyond %d sec, abort current query execution, start:%"PRId64", current:%d", pQInfo, 1,
pQInfo->startExecTs, taosGetTimestampSec());
return true;
}
return false;
}
static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;}
static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
......@@ -2864,7 +2890,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -3432,7 +3458,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
int64_t startt = taosGetTimestampMs();
while (1) {
if (IS_QUERY_KILLED(pQInfo)) {
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo);
tfree(pTableList);
......@@ -4018,7 +4044,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
cond.twindow.skey, cond.twindow.ekey);
// check if query is killed or not
if (IS_QUERY_KILLED(pQInfo)) {
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
}
......@@ -4675,7 +4701,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (tsdbNextDataBlock(pQueryHandle)) {
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -5112,7 +5138,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (IS_QUERY_KILLED(pQInfo)) {
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -5479,19 +5505,25 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// return;
// }
if (pQuery->prjInfo.vgroupLimit != -1) {
assert(pQuery->limit.limit == -1 && pQuery->limit.offset == 0);
} else if (pQuery->limit.limit != -1) {
assert(pQuery->prjInfo.vgroupLimit == -1);
}
bool hasMoreBlock = true;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SQueryCostInfo *summary = &pRuntimeEnv->summary;
while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) {
summary->totalBlocks += 1;
if (IS_QUERY_KILLED(pQInfo)) {
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
STableQueryInfo **pTableQueryInfo =
(STableQueryInfo **)taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid));
(STableQueryInfo **) taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid));
if (pTableQueryInfo == NULL) {
break;
}
......@@ -5503,6 +5535,25 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
setTagVal(pRuntimeEnv, pQuery->current->pTable, pQInfo->tsdb);
}
if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->current->windowResInfo.size > pQuery->prjInfo.vgroupLimit) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
// it is a super table ordered projection query, check for the number of output for each vgroup
if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->rec.rows >= pQuery->prjInfo.vgroupLimit) {
if (QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.skey >= pQuery->prjInfo.ts) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
} else if (!QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.ekey <= pQuery->prjInfo.ts) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
}
uint32_t status = 0;
SDataStatis *pStatis = NULL;
SArray *pDataBlock = NULL;
......@@ -5520,6 +5571,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
ensureOutputBuffer(pRuntimeEnv, &blockInfo);
int64_t prev = getNumOfResult(pRuntimeEnv);
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
......@@ -5530,17 +5583,30 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
int64_t inc = pQuery->rec.rows - prev;
pQuery->current->windowResInfo.size += (int32_t) inc;
// the flag may be set by tableApplyFunctionsOnBlock, clear it here
CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED);
updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo);
skipResults(pRuntimeEnv);
// the limitation of output result is reached, set the query completed
if (limitResults(pRuntimeEnv)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
SET_STABLE_QUERY_OVER(pQInfo);
break;
if (pQuery->prjInfo.vgroupLimit >= 0) {
if (((pQuery->rec.rows + pQuery->rec.total) < pQuery->prjInfo.vgroupLimit) || ((pQuery->rec.rows + pQuery->rec.total) > pQuery->prjInfo.vgroupLimit && prev < pQuery->prjInfo.vgroupLimit)) {
if (QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts < blockInfo.window.ekey) {
pQuery->prjInfo.ts = blockInfo.window.ekey;
} else if (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts > blockInfo.window.skey) {
pQuery->prjInfo.ts = blockInfo.window.skey;
}
}
} else {
// the limitation of output result is reached, set the query completed
skipResults(pRuntimeEnv);
if (limitResults(pRuntimeEnv)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
SET_STABLE_QUERY_OVER(pQInfo);
break;
}
}
// while the output buffer is full or limit/offset is applied, query may be paused here
......@@ -5582,7 +5648,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) {
if (IS_QUERY_KILLED(pQInfo)) {
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -5768,7 +5834,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
qDebug("QInfo:%p master scan completed, elapsed time: %" PRId64 "ms, reverse scan start", pQInfo, el);
// query error occurred or query is killed, abort current execution
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -5789,7 +5855,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED);
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
//TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
......@@ -5905,7 +5971,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
doSecondaryArithmeticProcess(pQuery);
if (IS_QUERY_KILLED(pQInfo)) {
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -6284,7 +6350,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset);
pQueryMsg->limit = htobe64(pQueryMsg->limit);
pQueryMsg->offset = htobe64(pQueryMsg->offset);
pQueryMsg->tableLimit = htobe64(pQueryMsg->tableLimit);
pQueryMsg->vgroupLimit = htobe64(pQueryMsg->vgroupLimit);
pQueryMsg->order = htons(pQueryMsg->order);
pQueryMsg->orderColId = htons(pQueryMsg->orderColId);
......@@ -6885,6 +6951,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->fillType = pQueryMsg->fillType;
pQuery->numOfTags = pQueryMsg->numOfTags;
pQuery->tagColList = pTagCols;
pQuery->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit;
pQuery->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX;
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) {
......@@ -7479,7 +7547,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
pthread_mutex_lock(&pQInfo->lock);
pQInfo->dataReady = QUERY_RESULT_READY;
buildRes = (pQInfo->rspContext != NULL);
buildRes = needBuildResAfterQueryComplete(pQInfo);
// clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
// put into task to be executed.
......@@ -7488,6 +7556,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
pthread_mutex_unlock(&pQInfo->lock);
// used in retrieve blocking model.
tsem_post(&pQInfo->ready);
return buildRes;
}
......@@ -7504,7 +7573,9 @@ bool qTableQuery(qinfo_t qinfo) {
return false;
}
if (IS_QUERY_KILLED(pQInfo)) {
pQInfo->startExecTs = taosGetTimestampSec();
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo);
return doBuildResCheck(pQInfo);
}
......@@ -7536,7 +7607,7 @@ bool qTableQuery(qinfo_t qinfo) {
}
SQuery* pQuery = pRuntimeEnv->pQuery;
if (IS_QUERY_KILLED(pQInfo)) {
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo);
} else if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
......@@ -7564,30 +7635,31 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
int32_t code = TSDB_CODE_SUCCESS;
#if _NON_BLOCKING_RETRIEVE
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pthread_mutex_lock(&pQInfo->lock);
assert(pQInfo->rspContext == NULL);
if (pQInfo->dataReady == QUERY_RESULT_READY) {
if (tsHalfCoresForQuery) {
pQInfo->rspContext = pRspContext;
tsem_wait(&pQInfo->ready);
*buildRes = true;
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows,
pQInfo->code);
code = pQInfo->code;
} else {
*buildRes = false;
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
pQInfo->rspContext = pRspContext;
assert(pQInfo->rspContext != NULL);
}
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
code = pQInfo->code;
pthread_mutex_unlock(&pQInfo->lock);
#else
tsem_wait(&pQInfo->ready);
*buildRes = true;
code = pQInfo->code;
#endif
pthread_mutex_lock(&pQInfo->lock);
assert(pQInfo->rspContext == NULL);
if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true;
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%s", pQInfo, pQuery->rowSize,
pQuery->rec.rows, tstrerror(pQInfo->code));
} else {
*buildRes = false;
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
pQInfo->rspContext = pRspContext;
assert(pQInfo->rspContext != NULL);
}
code = pQInfo->code;
pthread_mutex_unlock(&pQInfo->lock);
}
return code;
}
......@@ -7655,7 +7727,7 @@ int32_t qQueryCompleted(qinfo_t qinfo) {
}
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
return IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER);
return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER);
}
int32_t qKillQuery(qinfo_t qinfo) {
......@@ -7952,8 +8024,6 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
return NULL;
}
const int32_t DEFAULT_QHANDLE_LIFE_SPAN = tsShellActivityTimer * 2 * 1000;
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo);
......@@ -7969,7 +8039,8 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
return NULL;
} else {
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_QHANDLE_LIFE_SPAN);
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
(getMaximumIdleDurationSec()*1000));
// pthread_mutex_unlock(&pQueryMgmt->lock);
return handle;
......
......@@ -234,7 +234,13 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
}
int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) {
double v = GET_DOUBLE_VAL(value);
double v = 0;
if (pBucket->type == TSDB_DATA_TYPE_FLOAT) {
v = GET_FLOAT_VAL(value);
} else {
v = GET_DOUBLE_VAL(value);
}
int32_t index = -1;
if (pBucket->range.dMinVal == DBL_MAX) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TWORKER_H
#define TDENGINE_TWORKER_H
#ifdef __cplusplus
extern "C" {
#endif
typedef void *(*FWorkerThread)(void *pWorker);
struct SWorkerPool;
typedef struct {
pthread_t thread; // thread
int32_t id; // worker ID
struct SWorkerPool *pPool;
} SWorker;
typedef struct SWorkerPool {
int32_t max; // max number of workers
int32_t min; // min number of workers
int32_t num; // current number of workers
void * qset;
char * name;
SWorker *worker;
FWorkerThread workerFp;
pthread_mutex_t mutex;
} SWorkerPool;
int32_t tWorkerInit(SWorkerPool *pPool);
void tWorkerCleanup(SWorkerPool *pPool);
void * tWorkerAllocQueue(SWorkerPool *pPool, void *ahandle);
void tWorkerFreeQueue(SWorkerPool *pPool, void *pQueue);
#ifdef __cplusplus
}
#endif
#endif
......@@ -240,9 +240,6 @@ void taosReadGlobalLogCfg() {
int olen, vlen;
char fileName[PATH_MAX] = {0};
mDebugFlag = 135;
sdbDebugFlag = 135;
wordexp_t full_path;
if ( 0 != wordexp(configDir, &full_path, 0)) {
printf("\nconfig file: %s wordexp fail! reason:%s\n", configDir, strerror(errno));
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tulog.h"
#include "tqueue.h"
#include "tworker.h"
int32_t tWorkerInit(SWorkerPool *pPool) {
pPool->qset = taosOpenQset();
pPool->worker = calloc(sizeof(SWorker), pPool->max);
pthread_mutex_init(&pPool->mutex, NULL);
for (int i = 0; i < pPool->max; ++i) {
SWorker *pWorker = pPool->worker + i;
pWorker->id = i;
pWorker->pPool = pPool;
}
uInfo("worker:%s is initialized, min:%d max:%d", pPool->name, pPool->min, pPool->max);
return 0;
}
void tWorkerCleanup(SWorkerPool *pPool) {
for (int i = 0; i < pPool->max; ++i) {
SWorker *pWorker = pPool->worker + i;
if(taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(pPool->qset);
}
}
for (int i = 0; i < pPool->max; ++i) {
SWorker *pWorker = pPool->worker + i;
if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL);
}
}
free(pPool->worker);
taosCloseQset(pPool->qset);
pthread_mutex_destroy(&pPool->mutex);
uInfo("worker:%s is closed", pPool->name);
}
void *tWorkerAllocQueue(SWorkerPool *pPool, void *ahandle) {
pthread_mutex_lock(&pPool->mutex);
taos_queue pQueue = taosOpenQueue();
if (pQueue == NULL) {
pthread_mutex_unlock(&pPool->mutex);
return NULL;
}
taosAddIntoQset(pPool->qset, pQueue, ahandle);
// spawn a thread to process queue
if (pPool->num < pPool->max) {
do {
SWorker *pWorker = pPool->worker + pPool->num;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, pPool->workerFp, pWorker) != 0) {
uError("worker:%s:%d failed to create thread to process since %s", pPool->name, pWorker->id, strerror(errno));
}
pthread_attr_destroy(&thAttr);
pPool->num++;
uDebug("worker:%s:%d is launched, total:%d", pPool->name, pWorker->id, pPool->num);
} while (pPool->num < pPool->min);
}
pthread_mutex_unlock(&pPool->mutex);
uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pPool->name, pQueue, ahandle);
return pQueue;
}
void tWorkerFreeQueue(SWorkerPool *pPool, void *pQueue) {
taosCloseQueue(pQueue);
uDebug("worker:%s, queue:%p is freed", pPool->name, pQueue);
}
......@@ -47,8 +47,9 @@ typedef struct {
int8_t isCommiting;
uint64_t version; // current version
uint64_t fversion; // version on saved data file
void * wqueue;
void * rqueue;
void * wqueue; // write queue
void * qqueue; // read query queue
void * fqueue; // read fetch/cancel queue
void * wal;
void * tsdb;
int64_t sync;
......
......@@ -212,8 +212,9 @@ int32_t vnodeOpen(int32_t vgId) {
pVnode->fversion = pVnode->version;
pVnode->wqueue = dnodeAllocVWriteQueue(pVnode);
pVnode->rqueue = dnodeAllocVReadQueue(pVnode);
if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) {
pVnode->qqueue = dnodeAllocVQueryQueue(pVnode);
pVnode->fqueue = dnodeAllocVFetchQueue(pVnode);
if (pVnode->wqueue == NULL || pVnode->qqueue == NULL || pVnode->fqueue == NULL) {
vnodeCleanUp(pVnode);
return terrno;
}
......@@ -373,9 +374,14 @@ void vnodeDestroy(SVnodeObj *pVnode) {
pVnode->wqueue = NULL;
}
if (pVnode->rqueue) {
dnodeFreeVReadQueue(pVnode->rqueue);
pVnode->rqueue = NULL;
if (pVnode->qqueue) {
dnodeFreeVQueryQueue(pVnode->qqueue);
pVnode->qqueue = NULL;
}
if (pVnode->fqueue) {
dnodeFreeVFetchQueue(pVnode->fqueue);
pVnode->fqueue = NULL;
}
tfree(pVnode->rootDir);
......
......@@ -14,7 +14,6 @@
*/
#define _DEFAULT_SOURCE
#define _NON_BLOCKING_RETRIEVE 0
#include "os.h"
#include "taosmsg.h"
#include "tqueue.h"
......@@ -25,12 +24,12 @@
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
int32_t vnodeInitRead(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
return 0;
}
......@@ -115,13 +114,16 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
}
pRead->qtype = qtype;
atomic_add_fetch_32(&pVnode->refCount, 1);
atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
vTrace("vgId:%d, write into vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
taosWriteQitem(pVnode->rqueue, qtype, pRead);
return TSDB_CODE_SUCCESS;
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) {
vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
return taosWriteQitem(pVnode->fqueue, qtype, pRead);
} else {
vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
return taosWriteQitem(pVnode->qqueue, qtype, pRead);
}
}
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
......@@ -197,26 +199,29 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// qHandle needs to be freed correctly
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pRead->pCont;
killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pRead->rpcHandle);
assert(pRead->contLen > 0 && killQueryMsg->free == 1);
void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle);
if (qhandle == NULL || *qhandle == NULL) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle,
pRead->rpcHandle);
} else {
assert(*qhandle == (void *)killQueryMsg->qhandle);
qKillQuery(*qhandle);
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
}
return TSDB_CODE_TSC_QUERY_CANCELLED;
vError("error rpc msg in query, %s", tstrerror(pRead->code));
}
// assert(pRead->code != TSDB_CODE_RPC_NETWORK_UNAVAIL);
// if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
// SCancelQueryMsg *pMsg = (SCancelQueryMsg *)pRead->pCont;
//// pMsg->free = htons(killQueryMsg->free);
// pMsg->qhandle = htobe64(pMsg->qhandle);
//
// vWarn("QInfo:%p connection %p broken, kill query", (void *)pMsg->qhandle, pRead->rpcHandle);
//// assert(pRead->contLen > 0 && pMsg->free == 1);
//
// void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)pMsg->qhandle);
// if (qhandle == NULL || *qhandle == NULL) {
// vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)pMsg->qhandle, pRead->rpcHandle);
// } else {
// assert(*qhandle == (void *)pMsg->qhandle);
//
// qKillQuery(*qhandle);
// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
// }
//
// return TSDB_CODE_TSC_QUERY_CANCELLED;
// }
int32_t code = TSDB_CODE_SUCCESS;
void ** handle = NULL;
......@@ -338,11 +343,12 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
}
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, invalid handle in retrieving result, code:0x%08x, QInfo:%p", pVnode->vgId, code, (void *)pRetrieve->qhandle);
vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pRetrieve->qhandle);
vnodeBuildNoResultQueryRsp(pRet);
return code;
}
// kill current query and free corresponding resources.
if (pRetrieve->free == 1) {
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle);
......@@ -373,8 +379,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
freeHandle = true;
} else { // result is not ready, return immediately
assert(buildRes == true);
// Only effects in the non-blocking model
if (!tsHalfCoresForQuery) {
if (!buildRes) {
......@@ -401,12 +405,11 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
SRetrieveTableMsg *killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
killQueryMsg->qhandle = htobe64((uint64_t)qhandle);
killQueryMsg->free = htons(1);
killQueryMsg->header.vgId = htonl(vgId);
killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
pMsg->qhandle = htobe64((uint64_t)qhandle);
pMsg->header.vgId = htonl(vgId);
pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
return rpcReportProgress(handle, (char *)killQueryMsg, sizeof(SRetrieveTableMsg));
return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg));
}
......@@ -166,14 +166,14 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
char walName[WAL_FILE_LEN];
snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId);
if (code != TSDB_CODE_SUCCESS) {
wError("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code));
continue;
}
wDebug("vgId:%d, file:%s, restore success", pWal->vgId, walName);
wInfo("vgId:%d, file:%s, restore success", pWal->vgId, walName);
count++;
}
......@@ -326,7 +326,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
offset = offset + sizeof(SWalHead) + pHead->len;
wDebug("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId,
wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId,
fileId, pHead->version, pWal->version, pHead->len);
pWal->version = pHead->version;
......
// execute this before anything else, including requesting any time on an agent
if (currentBuild.rawBuild.getCauses().toString().contains('BranchIndexingCause')) {
print "INFO: Build skipped due to trigger being Branch Indexing"
currentBuild.result = 'ABORTED' // optional, gives a better hint to the user that it's been skipped, rather than the default which shows it's successful
return
}
properties([pipelineTriggers([githubPush()])])
node {
git url: 'https://github.com/liuyq-617/TDengine'
git url: 'https://github.com/taosdata/TDengine.git'
}
def pre_test(){
......
......@@ -50,7 +50,7 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql);
}
void Test(char *qstr, const char *input, int i);
void Test(TAOS *taos, char *qstr, int i);
int main(int argc, char *argv[]) {
char qstr[1024];
......@@ -63,21 +63,22 @@ int main(int argc, char *argv[]) {
// init TAOS
taos_init();
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/);
exit(1);
}
for (int i = 0; i < 4000000; i++) {
Test(qstr, argv[1], i);
Test(taos, qstr, i);
}
taos_close(taos);
taos_cleanup();
}
void Test(char *qstr, const char *input, int index) {
TAOS *taos = taos_connect(input, "root", "taosdata", NULL, 0);
void Test(TAOS *taos, char *qstr, int index) {
printf("==================test at %d\n================================", index);
queryDB(taos, "drop database if exists demo");
queryDB(taos, "create database demo");
TAOS_RES *result;
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/);
exit(1);
}
queryDB(taos, "use demo");
queryDB(taos, "create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))");
......@@ -131,6 +132,5 @@ void Test(char *qstr, const char *input, int index) {
taos_free_result(result);
printf("====demo end====\n\n");
taos_close(taos);
}
......@@ -30,7 +30,6 @@ python3 ./test.py -f table/create_sensitive.py
python3 ./test.py -f table/max_table_length.py
python3 ./test.py -f table/alter_column.py
python3 ./test.py -f table/boundary.py
python3 ./test.py -f table/create-a-lot.py
python3 ./test.py -f table/create.py
python3 ./test.py -f table/del_stable.py
python3 ./test.py -f table/queryWithTaosdKilled.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册