提交 627166db 编写于 作者: sangshuduo's avatar sangshuduo

Merge branch 'develop' into docs/sangshuduo/update-doc-align-with-1.0.5

...@@ -36,7 +36,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql); ...@@ -36,7 +36,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql); int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
int32_t tscHandleInsertRetry(SSqlObj* pSql); int32_t tscHandleInsertRetry(SSqlObj* parent, SSqlObj* child);
void tscBuildResFromSubqueries(SSqlObj *pSql); void tscBuildResFromSubqueries(SSqlObj *pSql);
TAOS_ROW doSetResultRowData(SSqlObj *pSql); TAOS_ROW doSetResultRowData(SSqlObj *pSql);
......
...@@ -110,11 +110,12 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint ...@@ -110,11 +110,12 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
uint32_t offset); uint32_t offset);
void* tscDestroyBlockArrayList(SArray* pDataBlockList); void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pDataList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql);
int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size, int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList);
STableDataBlocks** dataBlocks);
/** /**
* for the projection query on metric or point interpolation query on metric, * for the projection query on metric or point interpolation query on metric,
...@@ -275,6 +276,8 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex); ...@@ -275,6 +276,8 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql); bool hasMoreVnodesToTry(SSqlObj *pSql);
bool hasMoreClauseToTry(SSqlObj* pSql); bool hasMoreClauseToTry(SSqlObj* pSql);
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
......
...@@ -37,40 +37,6 @@ extern "C" { ...@@ -37,40 +37,6 @@ extern "C" {
#include "qTsbuf.h" #include "qTsbuf.h"
#include "tcmdtype.h" #include "tcmdtype.h"
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = rand();
if (v % 5000 <= 0) {
return NULL;
} else {
return malloc(__size);
}
}
static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
uint32_t v = rand();
if (v % 5000 <= 0) {
return NULL;
} else {
return calloc(num, __size);
}
}
static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
uint32_t v = rand();
if (v % 5000 <= 0) {
return NULL;
} else {
return realloc(p, __size);
}
}
#define calloc u_calloc
#define malloc u_malloc
#define realloc u_realloc
#endif
// forward declaration // forward declaration
struct SSqlInfo; struct SSqlInfo;
struct SLocalReducer; struct SLocalReducer;
...@@ -78,7 +44,7 @@ struct SLocalReducer; ...@@ -78,7 +44,7 @@ struct SLocalReducer;
// data source from sql string or from file // data source from sql string or from file
enum { enum {
DATA_FROM_SQL_STRING = 1, DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2, DATA_FROM_DATA_FILE = 2,
}; };
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
...@@ -118,10 +84,10 @@ typedef struct STableMetaInfo { ...@@ -118,10 +84,10 @@ typedef struct STableMetaInfo {
* 1. keep the vgroup index during the multi-vnode super table projection query * 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion * 2. keep the vgroup index for multi-vnode insertion
*/ */
int32_t vgroupIndex; int32_t vgroupIndex;
char name[TSDB_TABLE_FNAME_LEN]; // (super) table name char name[TSDB_TABLE_FNAME_LEN]; // (super) table name
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray* tagColList; // SArray<SColumn*>, involved tag columns SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo; } STableMetaInfo;
/* the structure for sql function in select clause */ /* the structure for sql function in select clause */
...@@ -136,7 +102,7 @@ typedef struct SSqlExpr { ...@@ -136,7 +102,7 @@ typedef struct SSqlExpr {
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3 tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression. int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id int16_t resColId; // result column id
} SSqlExpr; } SSqlExpr;
typedef struct SColumnIndex { typedef struct SColumnIndex {
...@@ -204,22 +170,17 @@ typedef struct SParamInfo { ...@@ -204,22 +170,17 @@ typedef struct SParamInfo {
} SParamInfo; } SParamInfo;
typedef struct STableDataBlocks { typedef struct STableDataBlocks {
char tableId[TSDB_TABLE_FNAME_LEN]; char tableId[TSDB_TABLE_FNAME_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id int64_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block int32_t numOfTables; // number of tables in current submit block
int32_t rowSize; // row size for current table int32_t rowSize; // row size for current table
uint32_t nAllocSize; uint32_t nAllocSize;
uint32_t headerSize; // header for table info (uid, tid, submit metadata) uint32_t headerSize; // header for table info (uid, tid, submit metadata)
uint32_t size; uint32_t size;
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
/*
* the table meta of table, the table meta will be used during submit, keep a ref
* to avoid it to be removed from cache
*/
STableMeta *pTableMeta;
char *pData; char *pData;
// for parameter ('?') binding // for parameter ('?') binding
...@@ -252,7 +213,7 @@ typedef struct SQueryInfo { ...@@ -252,7 +213,7 @@ typedef struct SQueryInfo {
int64_t clauseLimit; // limit for current sub clause int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t tableLimit; // table limit in case of super table projection query + global order + limit int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id int16_t resColumnId; // result column id
...@@ -284,10 +245,14 @@ typedef struct { ...@@ -284,10 +245,14 @@ typedef struct {
int32_t numOfParams; int32_t numOfParams;
int8_t dataSourceType; // load data from file or not int8_t dataSourceType; // load data from file or not
int8_t submitSchema; // submit block is built with table schema int8_t submitSchema; // submit block is built with table schema
STagData *pTagData; // NOTE: pTagData->data is used as a variant length array STagData *pTagData; // NOTE: pTagData->data is used as a variant length array
SHashObj *pTableList; // referred table involved in sql
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql STableMeta **pTableMetaList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables;
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
} SSqlCmd; } SSqlCmd;
typedef struct SResRec { typedef struct SResRec {
......
...@@ -410,52 +410,26 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -410,52 +410,26 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code)); tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code));
goto _error; goto _error;
} else {
tscDebug("%p get %s successfully", pSql, msg);
} }
tscDebug("%p get %s successfully", pSql, msg);
if (pSql->pStream == NULL) { if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// check if it is a sub-query of super table query first, if true, enter another routine // check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql); tscDebug("%p update table meta in local cache, continue to process sql and send the corresponding query", pSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
}
// param already freed by other routine and pSql in tscCache when ctrl + c
if (atomic_load_ptr(&pSql->param) == NULL) {
return;
}
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSql;
// NOTE: the vgroupInfo for the queried super table must be existed here.
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
// tscProcessSql can add error into async res
tscProcessSql(pSql);
return;
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding tid_tag query", pSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return; return;
} else {
assert(code == TSDB_CODE_SUCCESS);
} }
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0)); assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
// tscProcessSql can add error into async res // tscProcessSql can add error into async res
tscProcessSql(pSql); tscProcessSql(pSql);
return; return;
...@@ -465,16 +439,18 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -465,16 +439,18 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return; return;
} else {
assert(code == TSDB_CODE_SUCCESS);
} }
assert(pCmd->command != TSDB_SQL_INSERT);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons: // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly. // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
// 2. vnode may need the schema information along with submit block to update its local table schema. // 2. vnode may need the schema information along with submit block to update its local table schema.
if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) { if (pCmd->command == TSDB_SQL_SELECT) {
tscDebug("%p redo parse sql string and proceed", pSql); tscDebug("%p redo parse sql string and proceed", pSql);
pCmd->parseFinished = false; pCmd->parseFinished = false;
tscResetSqlCmdObj(pCmd, false); tscResetSqlCmdObj(pCmd, false);
...@@ -486,16 +462,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -486,16 +462,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error; goto _error;
} }
if (pCmd->command == TSDB_SQL_INSERT) { tscProcessSql(pSql);
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.
*/
pSql->fp = pSql->fetchFp; // restore the fp
tscHandleInsertRetry(pSql);
} else if (pCmd->command == TSDB_SQL_SELECT) { // in case of other query type, continue
tscProcessSql(pSql);
}
}else { // in all other cases, simple retry }else { // in all other cases, simple retry
tscProcessSql(pSql); tscProcessSql(pSql);
} }
...@@ -551,6 +519,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -551,6 +519,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (!pSql->cmd.parseFinished) { if (!pSql->cmd.parseFinished) {
tsParseSql(pSql, false); tsParseSql(pSql, false);
} }
(*pSql->fp)(pSql->param, pSql, code); (*pSql->fp)(pSql->param, pSql, code);
return; return;
......
...@@ -2589,10 +2589,11 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) { ...@@ -2589,10 +2589,11 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) {
// all data are null, set it completed // all data are null, set it completed
if (pInfo->numOfElems == 0) { if (pInfo->numOfElems == 0) {
pResInfo->complete = true; pResInfo->complete = true;
} else {
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, GET_DOUBLE_VAL(&pInfo->minval), GET_DOUBLE_VAL(&pInfo->maxval));
} }
pInfo->stage += 1; pInfo->stage += 1;
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, GET_DOUBLE_VAL(&pInfo->minval), GET_DOUBLE_VAL(&pInfo->maxval));
} else { } else {
pResInfo->complete = true; pResInfo->complete = true;
} }
......
...@@ -726,10 +726,14 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -726,10 +726,14 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
SSchema p1 = {0}; SSchema p1 = {0};
if (pExpr->colInfo.colIndex != TSDB_TBNAME_COLUMN_INDEX) { if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) {
p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex);
} else {
p1 = tGetTableNameColumnSchema(); p1 = tGetTableNameColumnSchema();
} else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) {
p1.bytes = pExpr->resBytes;
p1.type = (uint8_t) pExpr->resType;
tstrncpy(p1.name, pExpr->aliasName, tListLen(p1.name));
} else {
p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex);
} }
int32_t inter = 0; int32_t inter = 0;
......
...@@ -686,17 +686,14 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) { ...@@ -686,17 +686,14 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
} }
} }
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **str, SParsedDataColInfo *spd, static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColInfo *spd, int32_t *totalNum) {
int32_t *totalNum) {
SSqlCmd * pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMeta);
STableDataBlocks *dataBuf = NULL; STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE, int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &dataBuf, NULL);
pTableMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
...@@ -1058,18 +1055,17 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1058,18 +1055,17 @@ int tsParseInsertSql(SSqlObj *pSql) {
return code; return code;
} }
if (NULL == pCmd->pTableList) { if (NULL == pCmd->pTableBlockHashList) {
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); pCmd->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES); if (NULL == pCmd->pTableBlockHashList) {
if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _clean;
} }
} else { } else {
str = pCmd->curSql; str = pCmd->curSql;
} }
tscDebug("%p create data block list for submit data:%p, pTableList:%p", pSql, pCmd->pDataBlocks, pCmd->pTableList); tscDebug("%p create data block list hashList:%p", pSql, pCmd->pTableBlockHashList);
while (1) { while (1) {
int32_t index = 0; int32_t index = 0;
...@@ -1091,7 +1087,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1091,7 +1087,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
*/ */
if (totalNum == 0) { if (totalNum == 0) {
code = TSDB_CODE_TSC_INVALID_SQL; code = TSDB_CODE_TSC_INVALID_SQL;
goto _error; goto _clean;
} else { } else {
break; break;
} }
...@@ -1104,11 +1100,11 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1104,11 +1100,11 @@ int tsParseInsertSql(SSqlObj *pSql) {
// Check if the table name available or not // Check if the table name available or not
if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) { if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) {
code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
goto _error; goto _clean;
} }
if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) { if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
...@@ -1122,12 +1118,12 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1122,12 +1118,12 @@ int tsParseInsertSql(SSqlObj *pSql) {
tscError("%p async insert parse error, code:%s", pSql, tstrerror(code)); tscError("%p async insert parse error, code:%s", pSql, tstrerror(code));
pCmd->curSql = NULL; pCmd->curSql = NULL;
goto _error; goto _clean;
} }
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL); code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
goto _error; goto _clean;
} }
index = 0; index = 0;
...@@ -1136,7 +1132,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1136,7 +1132,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (sToken.n == 0) { if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
goto _error; goto _clean;
} }
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
...@@ -1148,32 +1144,32 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1148,32 +1144,32 @@ int tsParseInsertSql(SSqlObj *pSql) {
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns); tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
/* /*
* app here insert data in different vnodes, so we need to set the following * app here insert data in different vnodes, so we need to set the following
* data in another submit procedure using async insert routines * data in another submit procedure using async insert routines
*/ */
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); code = doParseInsertStatement(pCmd, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
} else if (sToken.type == TK_FILE) { } else if (sToken.type == TK_FILE) {
if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
index = 0; index = 0;
sToken = tStrGetToken(str, &index, false, 0, NULL); sToken = tStrGetToken(str, &index, false, 0, NULL);
if (sToken.type != TK_STRING && sToken.type != TK_ID) { if (sToken.type != TK_STRING && sToken.type != TK_ID) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _error; goto _clean;
} }
str += index; str += index;
if (sToken.n == 0) { if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _error; goto _clean;
} }
strncpy(pCmd->payload, sToken.z, sToken.n); strncpy(pCmd->payload, sToken.z, sToken.n);
...@@ -1183,7 +1179,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1183,7 +1179,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
wordexp_t full_path; wordexp_t full_path;
if (wordexp(pCmd->payload, &full_path, 0) != 0) { if (wordexp(pCmd->payload, &full_path, 0) != 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
goto _error; goto _clean;
} }
tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize); tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize);
...@@ -1195,7 +1191,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1195,7 +1191,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
SSchema * pSchema = tscGetTableSchema(pTableMeta); SSchema * pSchema = tscGetTableSchema(pTableMeta);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
SParsedDataColInfo spd = {0}; SParsedDataColInfo spd = {0};
...@@ -1230,7 +1226,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1230,7 +1226,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (spd.hasVal[t] == true) { if (spd.hasVal[t] == true) {
code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
goto _error; goto _clean;
} }
spd.hasVal[t] = true; spd.hasVal[t] = true;
...@@ -1241,13 +1237,13 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1241,13 +1237,13 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (!findColumnIndex) { if (!findColumnIndex) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
goto _error; goto _clean;
} }
} }
if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) { if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) {
code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
goto _error; goto _clean;
} }
index = 0; index = 0;
...@@ -1256,16 +1252,16 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1256,16 +1252,16 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (sToken.type != TK_VALUES) { if (sToken.type != TK_VALUES) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
goto _error; goto _clean;
} }
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); code = doParseInsertStatement(pCmd, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
} else { } else {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
goto _error; goto _clean;
} }
} }
...@@ -1274,25 +1270,18 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1274,25 +1270,18 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _clean; goto _clean;
} }
if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgId if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
} }
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
goto _clean; goto _clean;
_error:
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
_clean: _clean:
taosHashCleanup(pCmd->pTableList); pCmd->curSql = NULL;
pCmd->pTableList = NULL;
pCmd->curSql = NULL;
pCmd->parseFinished = 1; pCmd->parseFinished = 1;
return code; return code;
} }
...@@ -1373,6 +1362,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { ...@@ -1373,6 +1362,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
pSql->parseRetry++; pSql->parseRetry++;
ret = tscToSQLCmd(pSql, &SQLInfo); ret = tscToSQLCmd(pSql, &SQLInfo);
} }
SQLInfoDestroy(&SQLInfo); SQLInfoDestroy(&SQLInfo);
} }
...@@ -1399,7 +1389,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock ...@@ -1399,7 +1389,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL); return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL);
} }
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -800,9 +800,9 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -800,9 +800,9 @@ static int insertStmtExecute(STscStmt* stmt) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1); assert(pCmd->numOfClause == 1);
if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) {
// merge according to vgid // merge according to vgid
int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks); int code = tscMergeTableDataBlocks(stmt->pSql);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -1310,7 +1310,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t ...@@ -1310,7 +1310,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
SColumnIndex index = {.tableIndex = tableIndex}; SColumnIndex index = {.tableIndex = tableIndex};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, sizeof(double), SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, sizeof(double),
-1000, sizeof(double), false); getNewResColId(pQueryInfo), sizeof(double), false);
char* name = (pItem->aliasName != NULL)? pItem->aliasName:pItem->pNode->token.z; char* name = (pItem->aliasName != NULL)? pItem->aliasName:pItem->pNode->token.z;
size_t len = MIN(sizeof(pExpr->aliasName), pItem->pNode->token.n + 1); size_t len = MIN(sizeof(pExpr->aliasName), pItem->pNode->token.n + 1);
...@@ -5317,7 +5317,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn ...@@ -5317,7 +5317,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
// keep original limitation value in globalLimit // keep original limitation value in globalLimit
pQueryInfo->clauseLimit = pQueryInfo->limit.limit; pQueryInfo->clauseLimit = pQueryInfo->limit.limit;
pQueryInfo->prjOffset = pQueryInfo->limit.offset; pQueryInfo->prjOffset = pQueryInfo->limit.offset;
pQueryInfo->tableLimit = -1; pQueryInfo->vgroupLimit = -1;
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
/* /*
...@@ -5327,7 +5327,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn ...@@ -5327,7 +5327,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
* than or equal to the value of limit. * than or equal to the value of limit.
*/ */
if (pQueryInfo->limit.limit > 0) { if (pQueryInfo->limit.limit > 0) {
pQueryInfo->tableLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset; pQueryInfo->vgroupLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset;
pQueryInfo->limit.limit = -1; pQueryInfo->limit.limit = -1;
} }
......
...@@ -280,19 +280,19 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -280,19 +280,19 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
} }
int32_t cmd = pCmd->command; int32_t cmd = pCmd->command;
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.submitSchema = 1;
}
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.submitSchema = 1;
}
pSql->res.code = rpcMsg->code; // keep the previous error code pSql->res.code = rpcMsg->code; // keep the previous error code
if (pSql->retry > pSql->maxRetry) { if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
...@@ -451,10 +451,10 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -451,10 +451,10 @@ int tscProcessSql(SSqlObj *pSql) {
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload; SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
pRetrieveMsg->free = htons(pQueryInfo->type); pRetrieveMsg->free = htons(pQueryInfo->type);
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
// todo valid the vgroupId at the client side // todo valid the vgroupId at the client side
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
...@@ -681,7 +681,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -681,7 +681,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->queryType = htonl(pQueryInfo->type); pQueryMsg->queryType = htonl(pQueryInfo->type);
pQueryMsg->tableLimit = htobe64(pQueryInfo->tableLimit); pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number
...@@ -1394,6 +1394,43 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { ...@@ -1394,6 +1394,43 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload;
// pCancelMsg->qhandle = htobe64(pSql->res.qhandle);
//
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
//
// if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// int32_t vgIndex = pTableMetaInfo->vgroupIndex;
// if (pTableMetaInfo->pVgroupTables == NULL) {
// SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
// assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
//
// pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
// } else {
// int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
// assert(vgIndex >= 0 && vgIndex < numOfVgroups);
//
// SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
//
// pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
// }
// } else {
// STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
// pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
// tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
// }
//
// pSql->cmd.payloadLen = sizeof(SCancelQueryMsg);
// pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY;
//
// pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
// return TSDB_CODE_SUCCESS;
//}
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SAlterDbMsg); pCmd->payloadLen = sizeof(SAlterDbMsg);
......
...@@ -900,9 +900,9 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -900,9 +900,9 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
strtolower(pSql->sqlstr, sql); strtolower(pSql->sqlstr, sql);
pCmd->curSql = NULL; pCmd->curSql = NULL;
if (NULL != pCmd->pTableList) { if (NULL != pCmd->pTableBlockHashList) {
taosHashCleanup(pCmd->pTableList); taosHashCleanup(pCmd->pTableBlockHashList);
pCmd->pTableList = NULL; pCmd->pTableBlockHashList = NULL;
} }
pSql->fp = asyncCallback; pSql->fp = asyncCallback;
......
...@@ -2149,6 +2149,29 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -2149,6 +2149,29 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
} }
} }
static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
if (pParentObj->retry > pParentObj->maxRetry) {
tscError("%p max retry reached, abort the retry effort", pParentObj)
return false;
}
for (int32_t i = 0; i < numOfSub; ++i) {
int32_t code = pParentObj->pSubs[i]->res.code;
if (code == TSDB_CODE_SUCCESS) {
continue;
}
if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE && code != TSDB_CODE_TDB_INVALID_TABLE_ID &&
code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
code != TSDB_CODE_APP_NOT_READY) {
pParentObj->res.code = code;
return false;
}
}
return true;
}
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
SInsertSupporter *pSupporter = (SInsertSupporter *)param; SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql; SSqlObj* pParentObj = pSupporter->pSql;
...@@ -2163,23 +2186,80 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2163,23 +2186,80 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
assert(pSql != NULL && pSql->res.code == numOfRows); assert(pSql != NULL && pSql->res.code == numOfRows);
pParentObj->res.code = pSql->res.code; pParentObj->res.code = pSql->res.code;
}
tfree(pSupporter); // set the flag in the parent sqlObj
if (pSql->cmd.submitSchema) {
pParentObj->cmd.submitSchema = 1;
}
}
if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) {
return; return;
} }
tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
// restore user defined fp // restore user defined fp
pParentObj->fp = pParentObj->fetchFp; pParentObj->fp = pParentObj->fetchFp;
int32_t numOfSub = pParentObj->subState.numOfSub;
if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
for(int32_t i = 0; i < numOfSub; ++i) {
SSqlObj* pSql = pParentObj->pSubs[i];
tfree(pSql->param);
}
// todo remove this parameter in async callback function definition.
// all data has been sent to vnode, call user function
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
} else {
if (!needRetryInsert(pParentObj, numOfSub)) {
tscQueueAsyncRes(pParentObj);
return;
}
int32_t numOfFailed = 0;
for(int32_t i = 0; i < numOfSub; ++i) {
SSqlObj* pSql = pParentObj->pSubs[i];
if (pSql->res.code != TSDB_CODE_SUCCESS) {
numOfFailed += 1;
// clean up tableMeta in cache
tscFreeQueryInfo(&pSql->cmd, true);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
// todo remove this parameter in async callback function definition. tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql);
// all data has been sent to vnode, call user function }
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; }
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj,
pParentObj->res.numOfRows, numOfFailed, numOfSub);
tscDebug("%p cleanup %d tableMeta in cache", pParentObj, pParentObj->cmd.numOfTables);
for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) {
taosCacheRelease(tscMetaCache, (void**)&(pParentObj->cmd.pTableMetaList[i]), true);
}
pParentObj->cmd.parseFinished = false;
pParentObj->subState.numOfRemain = numOfFailed;
pParentObj->subState.numOfSub = numOfFailed;
tscResetSqlCmdObj(&pParentObj->cmd, false);
tscDebug("%p re-parse sql to generate submit data, retry:%d", pParentObj, pParentObj->retry++);
int32_t code = tsParseSql(pParentObj, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) {
pParentObj->res.code = code;
tscQueueAsyncRes(pParentObj);
return;
}
tscDoQuery(pParentObj);
}
} }
/** /**
...@@ -2187,19 +2267,19 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2187,19 +2267,19 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
* @param pSql * @param pSql
* @return * @return
*/ */
int32_t tscHandleInsertRetry(SSqlObj* pSql) { int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
assert(pSql != NULL && pSql->param != NULL); assert(pSql != NULL && pSql->param != NULL);
SSqlCmd* pCmd = &pSql->cmd; // SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param; SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
assert(pSupporter->index < pSupporter->pSql->subState.numOfSub); assert(pSupporter->index < pSupporter->pSql->subState.numOfSub);
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index);
int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
// free the data block created from insert sql string // free the data block created from insert sql string
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); // pCmd->pDataBlocks = tscDestroyBlockArrayList(pParent->cmd.pDataBlocks);
if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
...@@ -2213,6 +2293,20 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -2213,6 +2293,20 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
// it is the failure retry insert
if (pSql->pSubs != NULL) {
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, i);
if (pSub->res.code != TSDB_CODE_SUCCESS) {
tscHandleInsertRetry(pSql, pSub);
}
}
return TSDB_CODE_SUCCESS;
}
pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks); pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks);
assert(pSql->subState.numOfSub > 0); assert(pSql->subState.numOfSub > 0);
......
...@@ -333,13 +333,15 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -333,13 +333,15 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (isNull(p, TSDB_DATA_TYPE_NCHAR)) { if (isNull(p, TSDB_DATA_TYPE_NCHAR)) {
memcpy(dst, p, varDataTLen(p)); memcpy(dst, p, varDataTLen(p));
} else { } else if (varDataLen(p) > 0) {
int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst)); int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst));
varDataSetLen(dst, length); varDataSetLen(dst, length);
if (length == 0) { if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p); tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
} }
} else {
varDataSetLen(dst, 0);
} }
p += pInfo->field.bytes; p += pInfo->field.bytes;
...@@ -377,7 +379,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) { ...@@ -377,7 +379,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
} }
static void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) { void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) {
if (pCmd == NULL || pCmd->numOfClause == 0) { if (pCmd == NULL || pCmd->numOfClause == 0) {
return; return;
} }
...@@ -403,12 +405,12 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) { ...@@ -403,12 +405,12 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
pCmd->msgType = 0; pCmd->msgType = 0;
pCmd->parseFinished = 0; pCmd->parseFinished = 0;
pCmd->autoCreated = 0; pCmd->autoCreated = 0;
pCmd->numOfTables = 0;
taosHashCleanup(pCmd->pTableList);
pCmd->pTableList = NULL; tfree(pCmd->pTableMetaList);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeQueryInfo(pCmd, removeFromCache); tscFreeQueryInfo(pCmd, removeFromCache);
} }
...@@ -575,6 +577,21 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) { ...@@ -575,6 +577,21 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
return NULL; return NULL;
} }
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable) {
if (pBlockHashTable == NULL) {
return NULL;
}
STableDataBlocks** p = taosHashIterate(pBlockHashTable, NULL);
while(p) {
tscDestroyDataBlock(*p);
p = taosHashIterate(pBlockHashTable, p);
}
taosHashCleanup(pBlockHashTable);
return NULL;
}
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
assert(pDataBlock->pTableMeta != NULL); assert(pDataBlock->pTableMeta != NULL);
...@@ -671,9 +688,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -671,9 +688,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size, int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList) {
STableDataBlocks** dataBlocks) {
*dataBlocks = NULL; *dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id)); STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
...@@ -688,7 +704,9 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t ...@@ -688,7 +704,9 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t
} }
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES); taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
taosArrayPush(pDataBlockList, dataBlocks); if (pBlockList) {
taosArrayPush(pBlockList, dataBlocks);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -769,22 +787,37 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { ...@@ -769,22 +787,37 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result; return result;
} }
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { static void extractTableMeta(SSqlCmd* pCmd) {
pCmd->numOfTables = (int32_t) taosHashGetSize(pCmd->pTableBlockHashList);
pCmd->pTableMetaList = calloc(pCmd->numOfTables, POINTER_BYTES);
STableDataBlocks **p1 = taosHashIterate(pCmd->pTableBlockHashList, NULL);
int32_t i = 0;
while(p1) {
STableDataBlocks* pBlocks = *p1;
pCmd->pTableMetaList[i++] = taosCacheTransfer(tscMetaCache, (void**) &pBlocks->pTableMeta);
p1 = taosHashIterate(pCmd->pTableBlockHashList, p1);
}
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList);
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
size_t total = taosArrayGetSize(pTableDataBlockList); STableDataBlocks** p = taosHashIterate(pCmd->pTableBlockHashList, NULL);
for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pOneTableBlock = *p;
while(pOneTableBlock) {
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta); int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
int32_t ret = int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret); tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
...@@ -839,14 +872,19 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { ...@@ -839,14 +872,19 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
// the length does not include the SSubmitBlk structure // the length does not include the SSubmitBlk structure
pBlocks->dataLen = htonl(finalLen); pBlocks->dataLen = htonl(finalLen);
dataBuf->numOfTables += 1; dataBuf->numOfTables += 1;
p = taosHashIterate(pCmd->pTableBlockHashList, p);
if (p == NULL) {
break;
}
pOneTableBlock = *p;
} }
tscDestroyBlockArrayList(pTableDataBlockList); extractTableMeta(pCmd);
// free the table data blocks; // free the table data blocks;
pCmd->pDataBlocks = pVnodeDataBlockList; pCmd->pDataBlocks = pVnodeDataBlockList;
// tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2023,6 +2061,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -2023,6 +2061,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNewQueryInfo->limit = pQueryInfo->limit; pNewQueryInfo->limit = pQueryInfo->limit;
pNewQueryInfo->slimit = pQueryInfo->slimit; pNewQueryInfo->slimit = pQueryInfo->slimit;
pNewQueryInfo->order = pQueryInfo->order; pNewQueryInfo->order = pQueryInfo->order;
pNewQueryInfo->vgroupLimit = pQueryInfo->vgroupLimit;
pNewQueryInfo->tsBuf = NULL; pNewQueryInfo->tsBuf = NULL;
pNewQueryInfo->fillType = pQueryInfo->fillType; pNewQueryInfo->fillType = pQueryInfo->fillType;
pNewQueryInfo->fillVal = NULL; pNewQueryInfo->fillVal = NULL;
......
...@@ -36,7 +36,7 @@ enum { ...@@ -36,7 +36,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" )
// the SQL below is for mgmt node // the SQL below is for mgmt node
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" )
......
...@@ -24,8 +24,10 @@ extern "C" { ...@@ -24,8 +24,10 @@ extern "C" {
int32_t dnodeInitVRead(); int32_t dnodeInitVRead();
void dnodeCleanupVRead(); void dnodeCleanupVRead();
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg); void dnodeDispatchToVReadQueue(SRpcMsg *pMsg);
void * dnodeAllocVReadQueue(void *pVnode); void * dnodeAllocVQueryQueue(void *pVnode);
void dnodeFreeVReadQueue(void *pRqueue); void * dnodeAllocVFetchQueue(void *pVnode);
void dnodeFreeVQueryQueue(void *pQqueue);
void dnodeFreeVFetchQueue(void *pFqueue);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tqueue.h" #include "tqueue.h"
#include "tworker.h"
#include "dnodeVMgmt.h" #include "dnodeVMgmt.h"
typedef struct { typedef struct {
...@@ -23,9 +24,8 @@ typedef struct { ...@@ -23,9 +24,8 @@ typedef struct {
char pCont[]; char pCont[];
} SMgmtMsg; } SMgmtMsg;
static taos_qset tsMgmtQset = NULL; static SWorkerPool tsVMgmtWP;
static taos_queue tsMgmtQueue = NULL; static taos_queue tsVMgmtQueue = NULL;
static pthread_t tsQthread;
static void * dnodeProcessMgmtQueue(void *param); static void * dnodeProcessMgmtQueue(void *param);
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
...@@ -47,45 +47,23 @@ int32_t dnodeInitVMgmt() { ...@@ -47,45 +47,23 @@ int32_t dnodeInitVMgmt() {
int32_t code = vnodeInitMgmt(); int32_t code = vnodeInitMgmt();
if (code != TSDB_CODE_SUCCESS) return -1; if (code != TSDB_CODE_SUCCESS) return -1;
tsMgmtQset = taosOpenQset(); tsVMgmtWP.name = "vmgmt";
if (tsMgmtQset == NULL) { tsVMgmtWP.workerFp = dnodeProcessMgmtQueue;
dError("failed to create the vmgmt queue set"); tsVMgmtWP.min = 1;
return -1; tsVMgmtWP.max = 1;
} if (tWorkerInit(&tsVMgmtWP) != 0) return -1;
tsMgmtQueue = taosOpenQueue();
if (tsMgmtQueue == NULL) {
dError("failed to create the vmgmt queue");
return -1;
}
taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL);
pthread_attr_t thAttr; tsVMgmtQueue = tWorkerAllocQueue(&tsVMgmtWP, NULL);
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
pthread_attr_destroy(&thAttr);
if (code != 0) {
dError("failed to create thread to process vmgmt queue, reason:%s", strerror(errno));
return -1;
}
dInfo("dnode vmgmt is initialized"); dInfo("dnode vmgmt is initialized");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeCleanupVMgmt() { void dnodeCleanupVMgmt() {
if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset); tWorkerFreeQueue(&tsVMgmtWP, tsVMgmtQueue);
if (tsQthread) pthread_join(tsQthread, NULL); tWorkerCleanup(&tsVMgmtWP);
if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue);
if (tsMgmtQset) taosCloseQset(tsMgmtQset);
tsMgmtQset = NULL;
tsMgmtQueue = NULL;
tsVMgmtQueue = NULL;
vnodeCleanupMgmt(); vnodeCleanupMgmt();
} }
...@@ -97,7 +75,7 @@ static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) { ...@@ -97,7 +75,7 @@ static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
pMgmt->rpcMsg = *pMsg; pMgmt->rpcMsg = *pMsg;
pMgmt->rpcMsg.pCont = pMgmt->pCont; pMgmt->rpcMsg.pCont = pMgmt->pCont;
memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen); memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen);
taosWriteQitem(tsMgmtQueue, TAOS_QTYPE_RPC, pMgmt); taosWriteQitem(tsVMgmtQueue, TAOS_QTYPE_RPC, pMgmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -112,16 +90,18 @@ void dnodeDispatchToVMgmtQueue(SRpcMsg *pMsg) { ...@@ -112,16 +90,18 @@ void dnodeDispatchToVMgmtQueue(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
static void *dnodeProcessMgmtQueue(void *param) { static void *dnodeProcessMgmtQueue(void *wparam) {
SMgmtMsg *pMgmt; SWorker * pWorker = wparam;
SRpcMsg * pMsg; SWorkerPool *pPool = pWorker->pPool;
SRpcMsg rsp = {0}; SMgmtMsg * pMgmt;
int32_t qtype; SRpcMsg * pMsg;
void * handle; SRpcMsg rsp = {0};
int32_t qtype;
void * handle;
while (1) { while (1) {
if (taosReadQitemFromQset(tsMgmtQset, &qtype, (void **)&pMgmt, &handle) == 0) { if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pMgmt, &handle) == 0) {
dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset); dDebug("qdnode mgmt got no message from qset:%p, , exit", pPool->qset);
break; break;
} }
......
...@@ -16,66 +16,35 @@ ...@@ -16,66 +16,35 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tqueue.h" #include "tqueue.h"
#include "tworker.h"
#include "dnodeVRead.h" #include "dnodeVRead.h"
typedef struct {
pthread_t thread; // thread
int32_t workerId; // worker ID
} SVReadWorker;
typedef struct {
int32_t max; // max number of workers
int32_t min; // min number of workers
int32_t num; // current number of workers
SVReadWorker * worker;
pthread_mutex_t mutex;
} SVReadWorkerPool;
static void *dnodeProcessReadQueue(void *pWorker); static void *dnodeProcessReadQueue(void *pWorker);
// module global variable // module global variable
static SVReadWorkerPool tsVReadWP; static SWorkerPool tsVQueryWP;
static taos_qset tsVReadQset; static SWorkerPool tsVFetchWP;
int32_t dnodeInitVRead() { int32_t dnodeInitVRead() {
tsVReadQset = taosOpenQset(); tsVQueryWP.name = "vquery";
tsVQueryWP.workerFp = dnodeProcessReadQueue;
tsVReadWP.min = tsNumOfCores; tsVQueryWP.min = tsNumOfCores;
tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore; tsVQueryWP.max = tsNumOfCores/* * tsNumOfThreadsPerCore*/;
if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min; // if (tsVQueryWP.max <= tsVQueryWP.min * 2) tsVQueryWP.max = 2 * tsVQueryWP.min;
tsVReadWP.worker = calloc(sizeof(SVReadWorker), tsVReadWP.max); if (tWorkerInit(&tsVQueryWP) != 0) return -1;
pthread_mutex_init(&tsVReadWP.mutex, NULL);
tsVFetchWP.name = "vfetch";
if (tsVReadWP.worker == NULL) return -1; tsVFetchWP.workerFp = dnodeProcessReadQueue;
for (int i = 0; i < tsVReadWP.max; ++i) { tsVFetchWP.min = MIN(4, tsNumOfCores);
SVReadWorker *pWorker = tsVReadWP.worker + i; tsVFetchWP.max = tsVFetchWP.min;
pWorker->workerId = i; if (tWorkerInit(&tsVFetchWP) != 0) return -1;
}
dInfo("dnode vread is initialized, min worker:%d max worker:%d", tsVReadWP.min, tsVReadWP.max);
return 0; return 0;
} }
void dnodeCleanupVRead() { void dnodeCleanupVRead() {
for (int i = 0; i < tsVReadWP.max; ++i) { tWorkerCleanup(&tsVFetchWP);
SVReadWorker *pWorker = tsVReadWP.worker + i; tWorkerCleanup(&tsVQueryWP);
if (pWorker->thread) {
taosQsetThreadResume(tsVReadQset);
}
}
for (int i = 0; i < tsVReadWP.max; ++i) {
SVReadWorker *pWorker = tsVReadWP.worker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
}
free(tsVReadWP.worker);
taosCloseQset(tsVReadQset);
pthread_mutex_destroy(&tsVReadWP.mutex);
dInfo("dnode vread is closed");
} }
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
...@@ -88,6 +57,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { ...@@ -88,6 +57,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
assert(pHead->contLen > 0);
void *pVnode = vnodeAcquire(pHead->vgId); void *pVnode = vnodeAcquire(pHead->vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
int32_t code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg); int32_t code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg);
...@@ -107,43 +77,20 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { ...@@ -107,43 +77,20 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
void *dnodeAllocVReadQueue(void *pVnode) { void *dnodeAllocVQueryQueue(void *pVnode) {
pthread_mutex_lock(&tsVReadWP.mutex); return tWorkerAllocQueue(&tsVQueryWP, pVnode);
taos_queue queue = taosOpenQueue(); }
if (queue == NULL) {
pthread_mutex_unlock(&tsVReadWP.mutex);
return NULL;
}
taosAddIntoQset(tsVReadQset, queue, pVnode);
// spawn a thread to process queue
if (tsVReadWP.num < tsVReadWP.max) {
do {
SVReadWorker *pWorker = tsVReadWP.worker + tsVReadWP.num;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) {
dError("failed to create thread to process vread vqueue since %s", strerror(errno));
}
pthread_attr_destroy(&thAttr);
tsVReadWP.num++;
dDebug("dnode vread worker:%d is launched, total:%d", pWorker->workerId, tsVReadWP.num);
} while (tsVReadWP.num < tsVReadWP.min);
}
pthread_mutex_unlock(&tsVReadWP.mutex); void *dnodeAllocVFetchQueue(void *pVnode) {
dDebug("pVnode:%p, dnode vread queue:%p is allocated", pVnode, queue); return tWorkerAllocQueue(&tsVFetchWP, pVnode);
}
return queue; void dnodeFreeVQueryQueue(void *pQqueue) {
tWorkerFreeQueue(&tsVQueryWP, pQqueue);
} }
void dnodeFreeVReadQueue(void *pRqueue) { void dnodeFreeVFetchQueue(void *pFqueue) {
taosCloseQueue(pRqueue); tWorkerFreeQueue(&tsVFetchWP, pFqueue);
} }
void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) { void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
...@@ -160,18 +107,20 @@ void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) { ...@@ -160,18 +107,20 @@ void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) { void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) {
} }
static void *dnodeProcessReadQueue(void *pWorker) { static void *dnodeProcessReadQueue(void *wparam) {
SVReadMsg *pRead; SWorker * pWorker = wparam;
int32_t qtype; SWorkerPool *pPool = pWorker->pPool;
void * pVnode; SVReadMsg * pRead;
int32_t qtype;
void * pVnode;
while (1) { while (1) {
if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pRead, &pVnode) == 0) { if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pRead, &pVnode) == 0) {
dDebug("qset:%p dnode vread got no message from qset, exiting", tsVReadQset); dDebug("dnode vquery got no message from qset:%p, exiting", pPool->qset);
break; break;
} }
dTrace("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle, dTrace("msg:%p, app:%p type:%s will be processed in vquery queue, qtype:%d", pRead, pRead->rpcAhandle,
taosMsg[pRead->msgType], qtype); taosMsg[pRead->msgType], qtype);
int32_t code = vnodeProcessRead(pVnode, pRead); int32_t code = vnodeProcessRead(pVnode, pRead);
......
...@@ -49,8 +49,10 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); ...@@ -49,8 +49,10 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocVWriteQueue(void *pVnode); void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *pWqueue); void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code); void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode); void *dnodeAllocVQueryQueue(void *pVnode);
void dnodeFreeVReadQueue(void *pRqueue); void *dnodeAllocVFetchQueue(void *pVnode);
void dnodeFreeVQueryQueue(void *pQqueue);
void dnodeFreeVFetchQueue(void *pFqueue);
int32_t dnodeAllocateMPeerQueue(); int32_t dnodeAllocateMPeerQueue();
void dnodeFreeMPeerQueue(); void dnodeFreeMPeerQueue();
......
...@@ -473,7 +473,7 @@ typedef struct { ...@@ -473,7 +473,7 @@ typedef struct {
int16_t numOfGroupCols; // num of group by columns int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx; int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx int16_t orderType; // used in group by xx order by xxx
int64_t tableLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query.
int16_t prjOrder; // global order in super table projection query. int16_t prjOrder; // global order in super table projection query.
int64_t limit; int64_t limit;
int64_t offset; int64_t offset;
......
...@@ -34,6 +34,7 @@ typedef struct { ...@@ -34,6 +34,7 @@ typedef struct {
void * rpcHandle; void * rpcHandle;
void * rpcAhandle; void * rpcAhandle;
void * qhandle; void * qhandle;
void * pVnode;
int8_t qtype; int8_t qtype;
int8_t msgType; int8_t msgType;
SRspRet rspRet; SRspRet rspRet;
......
...@@ -140,6 +140,11 @@ typedef struct SQueryCostInfo { ...@@ -140,6 +140,11 @@ typedef struct SQueryCostInfo {
uint64_t numOfTimeWindows; uint64_t numOfTimeWindows;
} SQueryCostInfo; } SQueryCostInfo;
typedef struct {
int64_t vgroupLimit;
int64_t ts;
} SOrderedPrjQueryInfo;
typedef struct SQuery { typedef struct SQuery {
int16_t numOfCols; int16_t numOfCols;
int16_t numOfTags; int16_t numOfTags;
...@@ -167,6 +172,7 @@ typedef struct SQuery { ...@@ -167,6 +172,7 @@ typedef struct SQuery {
tFilePage** sdata; tFilePage** sdata;
STableQueryInfo* current; STableQueryInfo* current;
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
} SQuery; } SQuery;
...@@ -185,7 +191,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -185,7 +191,7 @@ typedef struct SQueryRuntimeEnv {
void* pQueryHandle; void* pQueryHandle;
void* pSecQueryHandle; // another thread for void* pSecQueryHandle; // another thread for
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
bool topBotQuery; // false bool topBotQuery; // TODO used bitwise flag
bool groupbyNormalCol; // denote if this is a groupby normal column query bool groupbyNormalCol; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation bool timeWindowInterpo;// if the time window start/end required interpolation
...@@ -210,14 +216,13 @@ enum { ...@@ -210,14 +216,13 @@ enum {
typedef struct SQInfo { typedef struct SQInfo {
void* signature; void* signature;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
void* tsdb; void* tsdb;
SMemRef memRef; SMemRef memRef;
int32_t vgId; int32_t vgId;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
// SArray* arrTableIdInfo;
SHashObj* arrTableIdInfo; SHashObj* arrTableIdInfo;
int32_t groupIndex; int32_t groupIndex;
...@@ -233,6 +238,7 @@ typedef struct SQInfo { ...@@ -233,6 +238,7 @@ typedef struct SQInfo {
tsem_t ready; tsem_t ready;
int32_t dataReady; // denote if query result is ready or not int32_t dataReady; // denote if query result is ready or not
void* rspContext; // response context void* rspContext; // response context
int64_t startExecTs; // start to exec timestamp
} SQInfo; } SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
...@@ -128,11 +128,14 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { ...@@ -128,11 +128,14 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
static void setQueryStatus(SQuery *pQuery, int8_t status); static void setQueryStatus(SQuery *pQuery, int8_t status);
static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv); static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) static int32_t getMaximumIdleDurationSec() {
return tsShellActivityTimer * 2;
}
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
...@@ -2138,8 +2141,31 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2138,8 +2141,31 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool); pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
} }
static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
return pQInfo->rspContext != NULL;
}
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
static bool isQueryKilled(SQInfo *pQInfo) {
if (IS_QUERY_KILLED(pQInfo)) {
return true;
}
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// abort current query execution.
if (pQInfo->owner != 0 && ((taosGetTimestampSec() - pQInfo->startExecTs) > getMaximumIdleDurationSec()) &&
(!needBuildResAfterQueryComplete(pQInfo))) {
assert(pQInfo->startExecTs != 0);
qDebug("QInfo:%p retrieve not arrive beyond %d sec, abort current query execution, start:%"PRId64", current:%d", pQInfo, 1,
pQInfo->startExecTs, taosGetTimestampSec());
return true;
}
return false;
}
static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;} static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;}
static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) { static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
...@@ -2864,7 +2890,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2864,7 +2890,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1; summary->totalBlocks += 1;
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -3432,7 +3458,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -3432,7 +3458,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
int64_t startt = taosGetTimestampMs(); int64_t startt = taosGetTimestampMs();
while (1) { while (1) {
if (IS_QUERY_KILLED(pQInfo)) { if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo); qDebug("QInfo:%p it is already killed, abort", pQInfo);
tfree(pTableList); tfree(pTableList);
...@@ -4018,7 +4044,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { ...@@ -4018,7 +4044,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
cond.twindow.skey, cond.twindow.ekey); cond.twindow.skey, cond.twindow.ekey);
// check if query is killed or not // check if query is killed or not
if (IS_QUERY_KILLED(pQInfo)) { if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
} }
...@@ -4675,7 +4701,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4675,7 +4701,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -5112,7 +5138,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { ...@@ -5112,7 +5138,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1; summary->totalBlocks += 1;
if (IS_QUERY_KILLED(pQInfo)) { if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -5479,19 +5505,25 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5479,19 +5505,25 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// return; // return;
// } // }
if (pQuery->prjInfo.vgroupLimit != -1) {
assert(pQuery->limit.limit == -1 && pQuery->limit.offset == 0);
} else if (pQuery->limit.limit != -1) {
assert(pQuery->prjInfo.vgroupLimit == -1);
}
bool hasMoreBlock = true; bool hasMoreBlock = true;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SQueryCostInfo *summary = &pRuntimeEnv->summary; SQueryCostInfo *summary = &pRuntimeEnv->summary;
while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) { while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) {
summary->totalBlocks += 1; summary->totalBlocks += 1;
if (IS_QUERY_KILLED(pQInfo)) { if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
STableQueryInfo **pTableQueryInfo = STableQueryInfo **pTableQueryInfo =
(STableQueryInfo **)taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); (STableQueryInfo **) taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid));
if (pTableQueryInfo == NULL) { if (pTableQueryInfo == NULL) {
break; break;
} }
...@@ -5503,6 +5535,25 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5503,6 +5535,25 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
setTagVal(pRuntimeEnv, pQuery->current->pTable, pQInfo->tsdb); setTagVal(pRuntimeEnv, pQuery->current->pTable, pQInfo->tsdb);
} }
if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->current->windowResInfo.size > pQuery->prjInfo.vgroupLimit) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
// it is a super table ordered projection query, check for the number of output for each vgroup
if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->rec.rows >= pQuery->prjInfo.vgroupLimit) {
if (QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.skey >= pQuery->prjInfo.ts) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
} else if (!QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.ekey <= pQuery->prjInfo.ts) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
}
uint32_t status = 0; uint32_t status = 0;
SDataStatis *pStatis = NULL; SDataStatis *pStatis = NULL;
SArray *pDataBlock = NULL; SArray *pDataBlock = NULL;
...@@ -5520,6 +5571,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5520,6 +5571,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
} }
ensureOutputBuffer(pRuntimeEnv, &blockInfo); ensureOutputBuffer(pRuntimeEnv, &blockInfo);
int64_t prev = getNumOfResult(pRuntimeEnv);
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
...@@ -5530,17 +5583,30 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5530,17 +5583,30 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQuery->rec.rows = getNumOfResult(pRuntimeEnv); pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
int64_t inc = pQuery->rec.rows - prev;
pQuery->current->windowResInfo.size += (int32_t) inc;
// the flag may be set by tableApplyFunctionsOnBlock, clear it here // the flag may be set by tableApplyFunctionsOnBlock, clear it here
CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED); CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED);
updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo); updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo);
skipResults(pRuntimeEnv);
// the limitation of output result is reached, set the query completed if (pQuery->prjInfo.vgroupLimit >= 0) {
if (limitResults(pRuntimeEnv)) { if (((pQuery->rec.rows + pQuery->rec.total) < pQuery->prjInfo.vgroupLimit) || ((pQuery->rec.rows + pQuery->rec.total) > pQuery->prjInfo.vgroupLimit && prev < pQuery->prjInfo.vgroupLimit)) {
setQueryStatus(pQuery, QUERY_COMPLETED); if (QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts < blockInfo.window.ekey) {
SET_STABLE_QUERY_OVER(pQInfo); pQuery->prjInfo.ts = blockInfo.window.ekey;
break; } else if (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts > blockInfo.window.skey) {
pQuery->prjInfo.ts = blockInfo.window.skey;
}
}
} else {
// the limitation of output result is reached, set the query completed
skipResults(pRuntimeEnv);
if (limitResults(pRuntimeEnv)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
SET_STABLE_QUERY_OVER(pQInfo);
break;
}
} }
// while the output buffer is full or limit/offset is applied, query may be paused here // while the output buffer is full or limit/offset is applied, query may be paused here
...@@ -5582,7 +5648,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5582,7 +5648,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList)); 1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) { while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) {
if (IS_QUERY_KILLED(pQInfo)) { if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -5768,7 +5834,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -5768,7 +5834,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
qDebug("QInfo:%p master scan completed, elapsed time: %" PRId64 "ms, reverse scan start", pQInfo, el); qDebug("QInfo:%p master scan completed, elapsed time: %" PRId64 "ms, reverse scan start", pQInfo, el);
// query error occurred or query is killed, abort current execution // query error occurred or query is killed, abort current execution
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -5789,7 +5855,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -5789,7 +5855,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
//TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead //TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
...@@ -5905,7 +5971,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -5905,7 +5971,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
pQuery->rec.rows = getNumOfResult(pRuntimeEnv); pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
doSecondaryArithmeticProcess(pQuery); doSecondaryArithmeticProcess(pQuery);
if (IS_QUERY_KILLED(pQInfo)) { if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -6284,7 +6350,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -6284,7 +6350,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset); pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset);
pQueryMsg->limit = htobe64(pQueryMsg->limit); pQueryMsg->limit = htobe64(pQueryMsg->limit);
pQueryMsg->offset = htobe64(pQueryMsg->offset); pQueryMsg->offset = htobe64(pQueryMsg->offset);
pQueryMsg->tableLimit = htobe64(pQueryMsg->tableLimit); pQueryMsg->vgroupLimit = htobe64(pQueryMsg->vgroupLimit);
pQueryMsg->order = htons(pQueryMsg->order); pQueryMsg->order = htons(pQueryMsg->order);
pQueryMsg->orderColId = htons(pQueryMsg->orderColId); pQueryMsg->orderColId = htons(pQueryMsg->orderColId);
...@@ -6885,6 +6951,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6885,6 +6951,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->fillType = pQueryMsg->fillType; pQuery->fillType = pQueryMsg->fillType;
pQuery->numOfTags = pQueryMsg->numOfTags; pQuery->numOfTags = pQueryMsg->numOfTags;
pQuery->tagColList = pTagCols; pQuery->tagColList = pTagCols;
pQuery->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit;
pQuery->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX;
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) { if (pQuery->colList == NULL) {
...@@ -7479,7 +7547,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) { ...@@ -7479,7 +7547,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
pthread_mutex_lock(&pQInfo->lock); pthread_mutex_lock(&pQInfo->lock);
pQInfo->dataReady = QUERY_RESULT_READY; pQInfo->dataReady = QUERY_RESULT_READY;
buildRes = (pQInfo->rspContext != NULL); buildRes = needBuildResAfterQueryComplete(pQInfo);
// clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is // clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
// put into task to be executed. // put into task to be executed.
...@@ -7488,6 +7556,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) { ...@@ -7488,6 +7556,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
pthread_mutex_unlock(&pQInfo->lock); pthread_mutex_unlock(&pQInfo->lock);
// used in retrieve blocking model.
tsem_post(&pQInfo->ready); tsem_post(&pQInfo->ready);
return buildRes; return buildRes;
} }
...@@ -7504,7 +7573,9 @@ bool qTableQuery(qinfo_t qinfo) { ...@@ -7504,7 +7573,9 @@ bool qTableQuery(qinfo_t qinfo) {
return false; return false;
} }
if (IS_QUERY_KILLED(pQInfo)) { pQInfo->startExecTs = taosGetTimestampSec();
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo); qDebug("QInfo:%p it is already killed, abort", pQInfo);
return doBuildResCheck(pQInfo); return doBuildResCheck(pQInfo);
} }
...@@ -7536,7 +7607,7 @@ bool qTableQuery(qinfo_t qinfo) { ...@@ -7536,7 +7607,7 @@ bool qTableQuery(qinfo_t qinfo) {
} }
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
if (IS_QUERY_KILLED(pQInfo)) { if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo); qDebug("QInfo:%p query is killed", pQInfo);
} else if (pQuery->rec.rows == 0) { } else if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total); qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
...@@ -7564,30 +7635,31 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex ...@@ -7564,30 +7635,31 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
#if _NON_BLOCKING_RETRIEVE if (tsHalfCoresForQuery) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; pQInfo->rspContext = pRspContext;
tsem_wait(&pQInfo->ready);
pthread_mutex_lock(&pQInfo->lock);
assert(pQInfo->rspContext == NULL);
if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true; *buildRes = true;
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows, code = pQInfo->code;
pQInfo->code);
} else { } else {
*buildRes = false; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
pQInfo->rspContext = pRspContext;
assert(pQInfo->rspContext != NULL);
}
code = pQInfo->code; pthread_mutex_lock(&pQInfo->lock);
pthread_mutex_unlock(&pQInfo->lock);
#else assert(pQInfo->rspContext == NULL);
tsem_wait(&pQInfo->ready); if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true; *buildRes = true;
code = pQInfo->code; qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%s", pQInfo, pQuery->rowSize,
#endif pQuery->rec.rows, tstrerror(pQInfo->code));
} else {
*buildRes = false;
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
pQInfo->rspContext = pRspContext;
assert(pQInfo->rspContext != NULL);
}
code = pQInfo->code;
pthread_mutex_unlock(&pQInfo->lock);
}
return code; return code;
} }
...@@ -7655,7 +7727,7 @@ int32_t qQueryCompleted(qinfo_t qinfo) { ...@@ -7655,7 +7727,7 @@ int32_t qQueryCompleted(qinfo_t qinfo) {
} }
SQuery* pQuery = pQInfo->runtimeEnv.pQuery; SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
return IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER); return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER);
} }
int32_t qKillQuery(qinfo_t qinfo) { int32_t qKillQuery(qinfo_t qinfo) {
...@@ -7952,8 +8024,6 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { ...@@ -7952,8 +8024,6 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
return NULL; return NULL;
} }
const int32_t DEFAULT_QHANDLE_LIFE_SPAN = tsShellActivityTimer * 2 * 1000;
SQueryMgmt *pQueryMgmt = pMgmt; SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) { if (pQueryMgmt->qinfoPool == NULL) {
qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo); qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo);
...@@ -7969,7 +8039,8 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { ...@@ -7969,7 +8039,8 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
return NULL; return NULL;
} else { } else {
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo; TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_QHANDLE_LIFE_SPAN); void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
(getMaximumIdleDurationSec()*1000));
// pthread_mutex_unlock(&pQueryMgmt->lock); // pthread_mutex_unlock(&pQueryMgmt->lock);
return handle; return handle;
......
...@@ -234,7 +234,13 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { ...@@ -234,7 +234,13 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
} }
int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) { int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) {
double v = GET_DOUBLE_VAL(value); double v = 0;
if (pBucket->type == TSDB_DATA_TYPE_FLOAT) {
v = GET_FLOAT_VAL(value);
} else {
v = GET_DOUBLE_VAL(value);
}
int32_t index = -1; int32_t index = -1;
if (pBucket->range.dMinVal == DBL_MAX) { if (pBucket->range.dMinVal == DBL_MAX) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TWORKER_H
#define TDENGINE_TWORKER_H
#ifdef __cplusplus
extern "C" {
#endif
typedef void *(*FWorkerThread)(void *pWorker);
struct SWorkerPool;
typedef struct {
pthread_t thread; // thread
int32_t id; // worker ID
struct SWorkerPool *pPool;
} SWorker;
typedef struct SWorkerPool {
int32_t max; // max number of workers
int32_t min; // min number of workers
int32_t num; // current number of workers
void * qset;
char * name;
SWorker *worker;
FWorkerThread workerFp;
pthread_mutex_t mutex;
} SWorkerPool;
int32_t tWorkerInit(SWorkerPool *pPool);
void tWorkerCleanup(SWorkerPool *pPool);
void * tWorkerAllocQueue(SWorkerPool *pPool, void *ahandle);
void tWorkerFreeQueue(SWorkerPool *pPool, void *pQueue);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tulog.h"
#include "tqueue.h"
#include "tworker.h"
int32_t tWorkerInit(SWorkerPool *pPool) {
pPool->qset = taosOpenQset();
pPool->worker = calloc(sizeof(SWorker), pPool->max);
pthread_mutex_init(&pPool->mutex, NULL);
for (int i = 0; i < pPool->max; ++i) {
SWorker *pWorker = pPool->worker + i;
pWorker->id = i;
pWorker->pPool = pPool;
}
uInfo("worker:%s is initialized, min:%d max:%d", pPool->name, pPool->min, pPool->max);
return 0;
}
void tWorkerCleanup(SWorkerPool *pPool) {
for (int i = 0; i < pPool->max; ++i) {
SWorker *pWorker = pPool->worker + i;
if(taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(pPool->qset);
}
}
for (int i = 0; i < pPool->max; ++i) {
SWorker *pWorker = pPool->worker + i;
if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL);
}
}
free(pPool->worker);
taosCloseQset(pPool->qset);
pthread_mutex_destroy(&pPool->mutex);
uInfo("worker:%s is closed", pPool->name);
}
void *tWorkerAllocQueue(SWorkerPool *pPool, void *ahandle) {
pthread_mutex_lock(&pPool->mutex);
taos_queue pQueue = taosOpenQueue();
if (pQueue == NULL) {
pthread_mutex_unlock(&pPool->mutex);
return NULL;
}
taosAddIntoQset(pPool->qset, pQueue, ahandle);
// spawn a thread to process queue
if (pPool->num < pPool->max) {
do {
SWorker *pWorker = pPool->worker + pPool->num;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, pPool->workerFp, pWorker) != 0) {
uError("worker:%s:%d failed to create thread to process since %s", pPool->name, pWorker->id, strerror(errno));
}
pthread_attr_destroy(&thAttr);
pPool->num++;
uDebug("worker:%s:%d is launched, total:%d", pPool->name, pWorker->id, pPool->num);
} while (pPool->num < pPool->min);
}
pthread_mutex_unlock(&pPool->mutex);
uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pPool->name, pQueue, ahandle);
return pQueue;
}
void tWorkerFreeQueue(SWorkerPool *pPool, void *pQueue) {
taosCloseQueue(pQueue);
uDebug("worker:%s, queue:%p is freed", pPool->name, pQueue);
}
...@@ -47,8 +47,9 @@ typedef struct { ...@@ -47,8 +47,9 @@ typedef struct {
int8_t isCommiting; int8_t isCommiting;
uint64_t version; // current version uint64_t version; // current version
uint64_t fversion; // version on saved data file uint64_t fversion; // version on saved data file
void * wqueue; void * wqueue; // write queue
void * rqueue; void * qqueue; // read query queue
void * fqueue; // read fetch/cancel queue
void * wal; void * wal;
void * tsdb; void * tsdb;
int64_t sync; int64_t sync;
......
...@@ -212,8 +212,9 @@ int32_t vnodeOpen(int32_t vgId) { ...@@ -212,8 +212,9 @@ int32_t vnodeOpen(int32_t vgId) {
pVnode->fversion = pVnode->version; pVnode->fversion = pVnode->version;
pVnode->wqueue = dnodeAllocVWriteQueue(pVnode); pVnode->wqueue = dnodeAllocVWriteQueue(pVnode);
pVnode->rqueue = dnodeAllocVReadQueue(pVnode); pVnode->qqueue = dnodeAllocVQueryQueue(pVnode);
if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) { pVnode->fqueue = dnodeAllocVFetchQueue(pVnode);
if (pVnode->wqueue == NULL || pVnode->qqueue == NULL || pVnode->fqueue == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return terrno; return terrno;
} }
...@@ -373,9 +374,14 @@ void vnodeDestroy(SVnodeObj *pVnode) { ...@@ -373,9 +374,14 @@ void vnodeDestroy(SVnodeObj *pVnode) {
pVnode->wqueue = NULL; pVnode->wqueue = NULL;
} }
if (pVnode->rqueue) { if (pVnode->qqueue) {
dnodeFreeVReadQueue(pVnode->rqueue); dnodeFreeVQueryQueue(pVnode->qqueue);
pVnode->rqueue = NULL; pVnode->qqueue = NULL;
}
if (pVnode->fqueue) {
dnodeFreeVFetchQueue(pVnode->fqueue);
pVnode->fqueue = NULL;
} }
tfree(pVnode->rootDir); tfree(pVnode->rootDir);
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#define _NON_BLOCKING_RETRIEVE 0
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tqueue.h" #include "tqueue.h"
...@@ -25,12 +24,12 @@ ...@@ -25,12 +24,12 @@
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
int32_t vnodeInitRead(void) { int32_t vnodeInitRead(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
return 0; return 0;
} }
...@@ -115,13 +114,16 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt ...@@ -115,13 +114,16 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
} }
pRead->qtype = qtype; pRead->qtype = qtype;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
atomic_add_fetch_32(&pVnode->queuedRMsg, 1); atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
vTrace("vgId:%d, write into vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
taosWriteQitem(pVnode->rqueue, qtype, pRead); if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) {
return TSDB_CODE_SUCCESS; vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
return taosWriteQitem(pVnode->fqueue, qtype, pRead);
} else {
vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
return taosWriteQitem(pVnode->qqueue, qtype, pRead);
}
} }
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) { static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
...@@ -197,26 +199,29 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -197,26 +199,29 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// qHandle needs to be freed correctly // qHandle needs to be freed correctly
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pRead->pCont; vError("error rpc msg in query, %s", tstrerror(pRead->code));
killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pRead->rpcHandle);
assert(pRead->contLen > 0 && killQueryMsg->free == 1);
void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle);
if (qhandle == NULL || *qhandle == NULL) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle,
pRead->rpcHandle);
} else {
assert(*qhandle == (void *)killQueryMsg->qhandle);
qKillQuery(*qhandle);
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
}
return TSDB_CODE_TSC_QUERY_CANCELLED;
} }
// assert(pRead->code != TSDB_CODE_RPC_NETWORK_UNAVAIL);
// if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
// SCancelQueryMsg *pMsg = (SCancelQueryMsg *)pRead->pCont;
//// pMsg->free = htons(killQueryMsg->free);
// pMsg->qhandle = htobe64(pMsg->qhandle);
//
// vWarn("QInfo:%p connection %p broken, kill query", (void *)pMsg->qhandle, pRead->rpcHandle);
//// assert(pRead->contLen > 0 && pMsg->free == 1);
//
// void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)pMsg->qhandle);
// if (qhandle == NULL || *qhandle == NULL) {
// vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)pMsg->qhandle, pRead->rpcHandle);
// } else {
// assert(*qhandle == (void *)pMsg->qhandle);
//
// qKillQuery(*qhandle);
// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
// }
//
// return TSDB_CODE_TSC_QUERY_CANCELLED;
// }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void ** handle = NULL; void ** handle = NULL;
...@@ -338,11 +343,12 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -338,11 +343,12 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, invalid handle in retrieving result, code:0x%08x, QInfo:%p", pVnode->vgId, code, (void *)pRetrieve->qhandle); vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pRetrieve->qhandle);
vnodeBuildNoResultQueryRsp(pRet); vnodeBuildNoResultQueryRsp(pRet);
return code; return code;
} }
// kill current query and free corresponding resources.
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle); qKillQuery(*handle);
...@@ -373,8 +379,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -373,8 +379,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
freeHandle = true; freeHandle = true;
} else { // result is not ready, return immediately } else { // result is not ready, return immediately
assert(buildRes == true);
// Only effects in the non-blocking model // Only effects in the non-blocking model
if (!tsHalfCoresForQuery) { if (!tsHalfCoresForQuery) {
if (!buildRes) { if (!buildRes) {
...@@ -401,12 +405,11 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -401,12 +405,11 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// notify connection(handle) that current qhandle is created, if current connection from // notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately. // client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
SRetrieveTableMsg *killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
killQueryMsg->qhandle = htobe64((uint64_t)qhandle); pMsg->qhandle = htobe64((uint64_t)qhandle);
killQueryMsg->free = htons(1); pMsg->header.vgId = htonl(vgId);
killQueryMsg->header.vgId = htonl(vgId); pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
return rpcReportProgress(handle, (char *)killQueryMsg, sizeof(SRetrieveTableMsg)); return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg));
} }
// execute this before anything else, including requesting any time on an agent
if (currentBuild.rawBuild.getCauses().toString().contains('BranchIndexingCause')) {
print "INFO: Build skipped due to trigger being Branch Indexing"
currentBuild.result = 'ABORTED' // optional, gives a better hint to the user that it's been skipped, rather than the default which shows it's successful
return
}
properties([pipelineTriggers([githubPush()])]) properties([pipelineTriggers([githubPush()])])
node { node {
git url: 'https://github.com/liuyq-617/TDengine' git url: 'https://github.com/taosdata/TDengine.git'
} }
def pre_test(){ def pre_test(){
......
...@@ -50,7 +50,7 @@ static void queryDB(TAOS *taos, char *command) { ...@@ -50,7 +50,7 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql); taos_free_result(pSql);
} }
void Test(char *qstr, const char *input, int i); void Test(TAOS *taos, char *qstr, int i);
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
char qstr[1024]; char qstr[1024];
...@@ -63,21 +63,22 @@ int main(int argc, char *argv[]) { ...@@ -63,21 +63,22 @@ int main(int argc, char *argv[]) {
// init TAOS // init TAOS
taos_init(); taos_init();
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/);
exit(1);
}
for (int i = 0; i < 4000000; i++) { for (int i = 0; i < 4000000; i++) {
Test(qstr, argv[1], i); Test(taos, qstr, i);
} }
taos_close(taos);
taos_cleanup(); taos_cleanup();
} }
void Test(char *qstr, const char *input, int index) { void Test(TAOS *taos, char *qstr, int index) {
TAOS *taos = taos_connect(input, "root", "taosdata", NULL, 0);
printf("==================test at %d\n================================", index); printf("==================test at %d\n================================", index);
queryDB(taos, "drop database if exists demo"); queryDB(taos, "drop database if exists demo");
queryDB(taos, "create database demo"); queryDB(taos, "create database demo");
TAOS_RES *result; TAOS_RES *result;
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/);
exit(1);
}
queryDB(taos, "use demo"); queryDB(taos, "use demo");
queryDB(taos, "create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))"); queryDB(taos, "create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))");
...@@ -131,6 +132,5 @@ void Test(char *qstr, const char *input, int index) { ...@@ -131,6 +132,5 @@ void Test(char *qstr, const char *input, int index) {
taos_free_result(result); taos_free_result(result);
printf("====demo end====\n\n"); printf("====demo end====\n\n");
taos_close(taos);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册