提交 0c914564 编写于 作者: H Haojun Liao

Merge branch 'main' into feature/3_liaohj

...@@ -195,7 +195,7 @@ typedef struct SDataBlockInfo { ...@@ -195,7 +195,7 @@ typedef struct SDataBlockInfo {
uint32_t capacity; uint32_t capacity;
SBlockID id; SBlockID id;
int16_t hasVarCol; int16_t hasVarCol;
int16_t dataLoad; // denote if the data is loaded or not int16_t dataLoad; // denote if the data is loaded or not
// TODO: optimize and remove following // TODO: optimize and remove following
int64_t version; // used for stream, and need serialization int64_t version; // used for stream, and need serialization
...@@ -204,8 +204,8 @@ typedef struct SDataBlockInfo { ...@@ -204,8 +204,8 @@ typedef struct SDataBlockInfo {
STimeWindow calWin; // used for stream, do not serialize STimeWindow calWin; // used for stream, do not serialize
TSKEY watermark; // used for stream TSKEY watermark; // used for stream
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
STag* pTag; // used for stream partition STag* pTag; // used for stream partition
} SDataBlockInfo; } SDataBlockInfo;
typedef struct SSDataBlock { typedef struct SSDataBlock {
...@@ -239,22 +239,22 @@ typedef struct SVarColAttr { ...@@ -239,22 +239,22 @@ typedef struct SVarColAttr {
// pBlockAgg->numOfNull == info.rows, all data are null // pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null. // pBlockAgg->numOfNull == 0, no data are null.
typedef struct SColumnInfoData { typedef struct SColumnInfoData {
char* pData; // the corresponding block data in memory char* pData; // the corresponding block data in memory
union { union {
char* nullbitmap; // bitmap, one bit for each item in the list char* nullbitmap; // bitmap, one bit for each item in the list
SVarColAttr varmeta; SVarColAttr varmeta;
}; };
SColumnInfo info; // column info SColumnInfo info; // column info
bool hasNull; // if current column data has null value. bool hasNull; // if current column data has null value.
} SColumnInfoData; } SColumnInfoData;
typedef struct SQueryTableDataCond { typedef struct SQueryTableDataCond {
uint64_t suid; uint64_t suid;
int32_t order; // desc|asc order to iterate the data block int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols; int32_t numOfCols;
SColumnInfo* colList; SColumnInfo* colList;
int32_t* pSlotList; // the column output destation slot, and it may be null int32_t* pSlotList; // the column output destation slot, and it may be null
int32_t type; // data block load type: int32_t type; // data block load type:
STimeWindow twindows; STimeWindow twindows;
int64_t startVersion; int64_t startVersion;
int64_t endVersion; int64_t endVersion;
...@@ -300,6 +300,7 @@ typedef struct STableBlockDistInfo { ...@@ -300,6 +300,7 @@ typedef struct STableBlockDistInfo {
int32_t firstSeekTimeUs; int32_t firstSeekTimeUs;
uint32_t numOfInmemRows; uint32_t numOfInmemRows;
uint32_t numOfSmallBlocks; uint32_t numOfSmallBlocks;
uint32_t numOfVgroups;
int32_t blockRowsHisto[20]; int32_t blockRowsHisto[20];
} STableBlockDistInfo; } STableBlockDistInfo;
......
...@@ -26,12 +26,12 @@ extern "C" { ...@@ -26,12 +26,12 @@ extern "C" {
typedef struct SQWorkerPool SQWorkerPool; typedef struct SQWorkerPool SQWorkerPool;
typedef struct SWWorkerPool SWWorkerPool; typedef struct SWWorkerPool SWWorkerPool;
typedef struct SQWorker { typedef struct SQueueWorker {
int32_t id; // worker id int32_t id; // worker id
int64_t pid; // thread pid int64_t pid; // thread pid
TdThread thread; // thread id TdThread thread; // thread id
void *pool; void *pool;
} SQWorker; } SQueueWorker;
typedef struct SQWorkerPool { typedef struct SQWorkerPool {
int32_t max; // max number of workers int32_t max; // max number of workers
...@@ -39,7 +39,7 @@ typedef struct SQWorkerPool { ...@@ -39,7 +39,7 @@ typedef struct SQWorkerPool {
int32_t num; // current number of workers int32_t num; // current number of workers
STaosQset *qset; STaosQset *qset;
const char *name; const char *name;
SQWorker *workers; SQueueWorker *workers;
TdThreadMutex mutex; TdThreadMutex mutex;
} SQWorkerPool; } SQWorkerPool;
......
...@@ -347,7 +347,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { ...@@ -347,7 +347,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
continue; continue;
} }
if (pRequest->killed) { if (pRequest->killed || 0 == pRequest->body.queryJob) {
releaseRequest(*rid); releaseRequest(*rid);
pIter = taosHashIterate(pObj->pRequests, pIter); pIter = taosHashIterate(pObj->pRequests, pIter);
continue; continue;
......
...@@ -58,7 +58,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode); ...@@ -58,7 +58,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode);
typedef int32_t (*ShowRetrieveFp)(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); typedef int32_t (*ShowRetrieveFp)(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
typedef struct SQWorker SQHandle; typedef struct SQueueWorker SQHandle;
typedef struct { typedef struct {
const char *name; const char *name;
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SQWorker SQHandle; typedef struct SQueueWorker SQHandle;
typedef struct SQnode { typedef struct SQnode {
int32_t qndId; int32_t qndId;
......
...@@ -58,7 +58,7 @@ typedef struct STQ STQ; ...@@ -58,7 +58,7 @@ typedef struct STQ STQ;
typedef struct SVState SVState; typedef struct SVState SVState;
typedef struct SVStatis SVStatis; typedef struct SVStatis SVStatis;
typedef struct SVBufPool SVBufPool; typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle; typedef struct SQueueWorker SQHandle;
typedef struct STsdbKeepCfg STsdbKeepCfg; typedef struct STsdbKeepCfg STsdbKeepCfg;
typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapReader SMetaSnapReader;
typedef struct SMetaSnapWriter SMetaSnapWriter; typedef struct SMetaSnapWriter SMetaSnapWriter;
......
...@@ -81,10 +81,10 @@ typedef struct MergeIndex { ...@@ -81,10 +81,10 @@ typedef struct MergeIndex {
} MergeIndex; } MergeIndex;
typedef struct SBlockDistInfo { typedef struct SBlockDistInfo {
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
STsdbReader* pHandle; STsdbReader* pHandle;
SReadHandle readHandle; SReadHandle readHandle;
uint64_t uid; // table uid uint64_t uid; // table uid
} SBlockDistInfo; } SBlockDistInfo;
static int32_t sysChkFilter__Comm(SNode* pNode); static int32_t sysChkFilter__Comm(SNode* pNode);
...@@ -129,10 +129,10 @@ static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time" ...@@ -129,10 +129,10 @@ static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time"
static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"}; static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"};
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity); static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName); static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName);
static void destroySysScanOperator(void* param); static void destroySysScanOperator(void* param);
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code); static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code);
static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse); static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse);
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable, static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
...@@ -199,11 +199,11 @@ int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) { ...@@ -199,11 +199,11 @@ int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) {
if (func == NULL) return -1; if (func == NULL) return -1;
SMetaFltParam param = {.suid = 0, SMetaFltParam param = {.suid = 0,
.cid = 0, .cid = 0,
.type = TSDB_DATA_TYPE_VARCHAR, .type = TSDB_DATA_TYPE_VARCHAR,
.val = pVal->datum.p, .val = pVal->datum.p,
.reverse = reverse, .reverse = reverse,
.filterFunc = func}; .filterFunc = func};
return -1; return -1;
} }
...@@ -218,11 +218,11 @@ int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) { ...@@ -218,11 +218,11 @@ int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) {
if (func == NULL) return -1; if (func == NULL) return -1;
SMetaFltParam param = {.suid = 0, SMetaFltParam param = {.suid = 0,
.cid = 0, .cid = 0,
.type = TSDB_DATA_TYPE_BIGINT, .type = TSDB_DATA_TYPE_BIGINT,
.val = &pVal->datum.i, .val = &pVal->datum.i,
.reverse = reverse, .reverse = reverse,
.filterFunc = func}; .filterFunc = func};
int32_t ret = metaFilterCreateTime(pMeta, &param, result); int32_t ret = metaFilterCreateTime(pMeta, &param, result);
return ret; return ret;
...@@ -350,9 +350,9 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt); ...@@ -350,9 +350,9 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name, static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode); void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode);
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
const char* name, SSDataBlock* pBlock); SSDataBlock* pBlock);
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) { __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) { if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) {
*reverse = true; *reverse = true;
} }
...@@ -516,9 +516,10 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { ...@@ -516,9 +516,10 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
metaTbCursorPrev(pInfo->pCur); metaTbCursorPrev(pInfo->pCur);
blockFull = true; blockFull = true;
} else { } else {
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, dataBlock); sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
dataBlock);
} }
metaReaderClear(&smrSuperTable); metaReaderClear(&smrSuperTable);
if (blockFull || numOfRows >= pOperator->resultInfo.capacity) { if (blockFull || numOfRows >= pOperator->resultInfo.capacity) {
...@@ -1343,7 +1344,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1343,7 +1344,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
return pBlock->info.rows > 0? pBlock:NULL; return pBlock->info.rows > 0 ? pBlock : NULL;
} else { } else {
return NULL; return NULL;
} }
...@@ -1357,7 +1358,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan ...@@ -1357,7 +1358,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan
if (pInfo->tbnameSlotId != -1) { if (pInfo->tbnameSlotId != -1) {
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId); SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0}; char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
memcpy(varDataVal(varTbName), name, strlen(name)); memcpy(varDataVal(varTbName), name, strlen(name));
varDataSetLen(varTbName, strlen(name)); varDataSetLen(varTbName, strlen(name));
...@@ -1489,10 +1490,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -1489,10 +1490,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL); pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL);
return pOperator; return pOperator;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroySysScanOperator(pInfo); destroySysScanOperator(pInfo);
} }
...@@ -1927,7 +1929,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { ...@@ -1927,7 +1929,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
// make the valgrind happy that all memory buffer has been initialized already. // make the valgrind happy that all memory buffer has been initialized already.
if (slotId != 0) { if (slotId != 0) {
SColumnInfoData* p1 = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* p1 = taosArrayGet(pBlock->pDataBlock, 0);
int64_t v = 0; int64_t v = 0;
colDataAppendInt64(p1, 0, &v); colDataAppendInt64(p1, 0, &v);
} }
...@@ -1937,10 +1939,10 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { ...@@ -1937,10 +1939,10 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
} }
static void destroyBlockDistScanOperatorInfo(void* param) { static void destroyBlockDistScanOperatorInfo(void* param) {
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
blockDataDestroy(pDistInfo->pResBlock); blockDataDestroy(pDistInfo->pResBlock);
tsdbReaderClose(pDistInfo->pHandle); tsdbReaderClose(pDistInfo->pHandle);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
...@@ -2001,7 +2003,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi ...@@ -2001,7 +2003,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
} }
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->uid = (pBlockScanNode->suid != 0)? pBlockScanNode->suid:pBlockScanNode->uid; pInfo->uid = (pBlockScanNode->suid != 0) ? pBlockScanNode->suid : pBlockScanNode->uid;
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
...@@ -2012,8 +2014,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi ...@@ -2012,8 +2014,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
OP_NOT_OPENED, pInfo, pTaskInfo); OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo,
createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
return pOperator; return pOperator;
_error: _error:
......
...@@ -775,13 +775,13 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { ...@@ -775,13 +775,13 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
static int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex); static int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
static int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, static int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
int32_t rowIndex); int32_t rowIndex);
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
int32_t currentRow = pBlock->info.rows; int32_t currentRow = pBlock->info.rows;
...@@ -795,7 +795,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -795,7 +795,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
((int64_t*)pCol->pData)[currentRow] = pRes->v; ((int64_t*)pCol->pData)[currentRow] = pRes->v;
// colDataAppendInt64(pCol, currentRow, &pRes->v); // colDataAppendInt64(pCol, currentRow, &pRes->v);
break; break;
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
...@@ -1691,7 +1691,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { ...@@ -1691,7 +1691,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SVariant* pVal = &pCtx->param[1].param; SVariant* pVal = &pCtx->param[1].param;
int32_t code = 0; int32_t code = 0;
double v = 0; double v = 0;
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i); GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
...@@ -2070,7 +2070,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) { ...@@ -2070,7 +2070,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) {
} }
static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
SFirstLastRes* pInfo) { SFirstLastRes* pInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pCtx->subsidiaries.num <= 0) { if (pCtx->subsidiaries.num <= 0) {
...@@ -2123,7 +2123,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { ...@@ -2123,7 +2123,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
} }
// All null data column, return directly. // All null data column, return directly.
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) { if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
pInputCol->hasNull == true) {
// save selectivity value for column consisted of all null values // save selectivity value for column consisted of all null values
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2239,7 +2240,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2239,7 +2240,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
} }
// All null data column, return directly. // All null data column, return directly.
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) { if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
pInputCol->hasNull == true) {
// save selectivity value for column consisted of all null values // save selectivity value for column consisted of all null values
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2333,7 +2335,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2333,7 +2335,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
} }
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
char* data = colDataGetData(pInputCol, chosen); char* data = colDataGetData(pInputCol, chosen);
int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data); int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2344,7 +2346,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2344,7 +2346,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) { for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data); int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2361,7 +2363,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2361,7 +2363,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
numOfElems++; numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data); int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2408,7 +2410,7 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p ...@@ -2408,7 +2410,7 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
} }
static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
int32_t rowIndex) { int32_t rowIndex) {
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) { if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) {
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput); int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2435,7 +2437,7 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer ...@@ -2435,7 +2437,7 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
for (int32_t i = start; i < start + pInput->numOfRows; ++i) { for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data); SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i); int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2667,7 +2669,7 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, ...@@ -2667,7 +2669,7 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
} }
static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos,
int32_t order, int64_t ts) { int32_t order, int64_t ts) {
int32_t factor = (order == TSDB_ORDER_ASC) ? 1 : -1; int32_t factor = (order == TSDB_ORDER_ASC) ? 1 : -1;
pDiffInfo->prevTs = ts; pDiffInfo->prevTs = ts;
switch (type) { switch (type) {
...@@ -2875,8 +2877,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { ...@@ -2875,8 +2877,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
return pRes; return pRes;
} }
static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery); static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
...@@ -2897,7 +2899,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { ...@@ -2897,7 +2899,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
} }
numOfElems++; numOfElems++;
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true); int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2931,7 +2933,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { ...@@ -2931,7 +2933,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
} }
numOfElems++; numOfElems++;
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false); int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2983,7 +2985,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par ...@@ -2983,7 +2985,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
} }
int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
STopBotRes* pRes = getTopBotOutputInfo(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx);
SVariant val = {0}; SVariant val = {0};
...@@ -3174,7 +3176,7 @@ static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPo ...@@ -3174,7 +3176,7 @@ static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPo
if (pPage == NULL) { if (pPage == NULL) {
return NULL; return NULL;
} }
char* p = pPage->data + pPos->offset; char* p = pPage->data + pPos->offset;
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
return p; return p;
} else { } else {
...@@ -4635,7 +4637,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -4635,7 +4637,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
continue; continue;
} }
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
int32_t code = doReservoirSample(pCtx, pInfo, data, i); int32_t code = doReservoirSample(pCtx, pInfo, data, i);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -4655,7 +4657,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -4655,7 +4657,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
} }
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SSampleInfo* pInfo = getSampleOutputInfo(pCtx); SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
...@@ -4989,7 +4991,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { ...@@ -4989,7 +4991,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
} }
numOfElems++; numOfElems++;
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
int32_t code = doModeAdd(pInfo, i, pCtx, data); int32_t code = doModeAdd(pInfo, i, pCtx, data);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -5410,6 +5412,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { ...@@ -5410,6 +5412,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
if (pDistInfo->maxRows < p1.maxRows) { if (pDistInfo->maxRows < p1.maxRows) {
pDistInfo->maxRows = p1.maxRows; pDistInfo->maxRows = p1.maxRows;
} }
pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
for (int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) { for (int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) {
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i]; pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
...@@ -5438,6 +5441,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist ...@@ -5438,6 +5441,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1; if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) { for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
if (tEncodeI32(&encoder, pInfo->blockRowsHisto[i]) < 0) return -1; if (tEncodeI32(&encoder, pInfo->blockRowsHisto[i]) < 0) return -1;
...@@ -5469,6 +5473,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo ...@@ -5469,6 +5473,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1; if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) { for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
if (tDecodeI32(&decoder, &pInfo->blockRowsHisto[i]) < 0) return -1; if (tDecodeI32(&decoder, &pInfo->blockRowsHisto[i]) < 0) return -1;
...@@ -5520,7 +5525,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -5520,7 +5525,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
colDataAppend(pColInfo, row++, st, false); colDataAppend(pColInfo, row++, st, false);
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables, len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables,
pData->numOfFiles, 0); pData->numOfFiles, pData->numOfVgroups);
varDataSetLen(st, len); varDataSetLen(st, len);
colDataAppend(pColInfo, row++, st, false); colDataAppend(pColInfo, row++, st, false);
......
...@@ -76,6 +76,7 @@ typedef struct SQWDebug { ...@@ -76,6 +76,7 @@ typedef struct SQWDebug {
bool lockEnable; bool lockEnable;
bool statusEnable; bool statusEnable;
bool dumpEnable; bool dumpEnable;
bool forceStop;
bool sleepSimulate; bool sleepSimulate;
bool deadSimulate; bool deadSimulate;
bool redirectSimulate; bool redirectSimulate;
...@@ -248,6 +249,7 @@ typedef struct SQWorkerMgmt { ...@@ -248,6 +249,7 @@ typedef struct SQWorkerMgmt {
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY) #define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch) #define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)
#define QW_QUERY_NOT_STARTED(ctx) (QW_GET_PHASE(ctx) == -1)
#define QW_SET_QTID(id, qId, tId, eId) \ #define QW_SET_QTID(id, qId, tId, eId) \
do { \ do { \
......
...@@ -9,11 +9,13 @@ ...@@ -9,11 +9,13 @@
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
SQWDebug gQWDebug = {.statusEnable = true, SQWDebug gQWDebug = {.lockEnable = false,
.statusEnable = true,
.dumpEnable = false, .dumpEnable = false,
.redirectSimulate = false, .redirectSimulate = false,
.deadSimulate = false, .deadSimulate = false,
.sleepSimulate = false}; .sleepSimulate = false,
.forceStop = false};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
if (!gQWDebug.statusEnable) { if (!gQWDebug.statusEnable) {
...@@ -306,6 +308,12 @@ int32_t qwDbgEnableDebug(char *option) { ...@@ -306,6 +308,12 @@ int32_t qwDbgEnableDebug(char *option) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (0 == strcasecmp(option, "forceStop")) {
gQWDebug.forceStop = true;
qError("qw forceStop debug enabled");
return TSDB_CODE_SUCCESS;
}
qError("invalid qw debug option:%s", option); qError("invalid qw debug option:%s", option);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
......
...@@ -18,6 +18,51 @@ SQWorkerMgmt gQwMgmt = { ...@@ -18,6 +18,51 @@ SQWorkerMgmt gQwMgmt = {
.qwNum = 0, .qwNum = 0,
}; };
int32_t qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, tId, sId;
int32_t eId;
int64_t rId = 0;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId);
QW_LOCK(QW_WRITE, &ctx->lock);
sId = ctx->sId;
QW_TASK_DLOG_E("start to force stop task");
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
continue;
}
if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
QW_TASK_DLOG_E("task running, async killed");
} else if (QW_FETCH_RUNNING(ctx)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
QW_TASK_DLOG_E("task fetching, update drop received");
} else {
qwDropTask(QW_FPARAMS());
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0; int32_t code = 0;
SSchedulerHbRsp rsp = {0}; SSchedulerHbRsp rsp = {0};
...@@ -973,6 +1018,10 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { ...@@ -973,6 +1018,10 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
qwDbgDumpMgmtInfo(mgmt); qwDbgDumpMgmtInfo(mgmt);
if (gQWDebug.forceStop) {
(void)qwStopAllTasks(mgmt);
}
QW_LOCK(QW_READ, &mgmt->schLock); QW_LOCK(QW_READ, &mgmt->schLock);
int32_t schNum = taosHashGetSize(mgmt->schHash); int32_t schNum = taosHashGetSize(mgmt->schHash);
...@@ -1087,6 +1136,7 @@ _return: ...@@ -1087,6 +1136,7 @@ _return:
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) { if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) {
qError("invalid param to init qworker"); qError("invalid param to init qworker");
...@@ -1185,46 +1235,10 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { ...@@ -1185,46 +1235,10 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash)); QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
uint64_t qId, tId, sId;
int32_t eId;
int64_t rId = 0;
atomic_store_8(&mgmt->nodeStopped, 1); atomic_store_8(&mgmt->nodeStopped, 1);
void *pIter = taosHashIterate(mgmt->ctxHash, NULL); (void)qwStopAllTasks(mgmt);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId);
QW_LOCK(QW_WRITE, &ctx->lock);
sId = ctx->sId;
QW_TASK_DLOG_E("start to force stop task");
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
continue;
}
if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
} else if (QW_FETCH_RUNNING(ctx)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
} else {
qwDropTask(QW_FPARAMS());
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
} }
void qWorkerDestroy(void **qWorkerMgmt) { void qWorkerDestroy(void **qWorkerMgmt) {
......
...@@ -727,7 +727,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -727,7 +727,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
conn->status = ConnNormal; conn->status = ConnNormal;
conn->broken = 0; conn->broken = false;
transRefCliHandle(conn); transRefCliHandle(conn);
atomic_add_fetch_32(&pThrd->connCount, 1); atomic_add_fetch_32(&pThrd->connCount, 1);
...@@ -997,6 +997,11 @@ static void cliDestroyBatch(SCliBatch* pBatch) { ...@@ -997,6 +997,11 @@ static void cliDestroyBatch(SCliBatch* pBatch) {
taosMemoryFree(pBatch); taosMemoryFree(pBatch);
} }
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
if (pThrd->quit == true) {
cliDestroyBatch(pBatch);
return;
}
if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
return; return;
} }
...@@ -1082,17 +1087,23 @@ static void cliSendBatchCb(uv_write_t* req, int status) { ...@@ -1082,17 +1087,23 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
if (status != 0) { if (status != 0) {
tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
p->wLen, p->batchSize, uv_err_name(status)); p->wLen, p->batchSize, uv_err_name(status));
cliHandleExcept(conn);
if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn);
cliHandleBatchReq(nxtBatch, thrd); cliHandleBatchReq(nxtBatch, thrd);
} else { } else {
tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
p->batchSize); p->batchSize);
if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
if (nxtBatch != NULL) { if (nxtBatch != NULL) {
conn->pBatch = nxtBatch; conn->pBatch = nxtBatch;
cliSendBatch(conn); cliSendBatch(conn);
} else {
addConnToPool(thrd->pool, conn);
}
} else { } else {
addConnToPool(thrd->pool, conn); cliDestroyBatch(nxtBatch);
// conn release by other callback
} }
} }
...@@ -1454,6 +1465,11 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { ...@@ -1454,6 +1465,11 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg->type == Quit) {
pThrd->stopMsg = pMsg;
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd); (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
count++; count++;
...@@ -1485,6 +1501,12 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { ...@@ -1485,6 +1501,12 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg->type == Quit) {
pThrd->stopMsg = pMsg;
continue;
}
if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
...@@ -1582,7 +1604,6 @@ static void cliAsyncCb(uv_async_t* handle) { ...@@ -1582,7 +1604,6 @@ static void cliAsyncCb(uv_async_t* handle) {
SCliThrd* pThrd = item->pThrd; SCliThrd* pThrd = item->pThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SCliMsg* pMsg = NULL;
// batch process to avoid to lock/unlock frequently // batch process to avoid to lock/unlock frequently
queue wq; queue wq;
taosThreadMutexLock(&item->mtx); taosThreadMutexLock(&item->mtx);
...@@ -2285,22 +2306,8 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran ...@@ -2285,22 +2306,8 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK; return TSDB_CODE_RPC_BROKEN_LINK;
} }
/*if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) {
char key[TSDB_FQDN_LEN + 64] = {0};
char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet);
uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet);
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key));
if (val != NULL && *val >= pTransInst->connLimitNum) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_MAX_SESSIONS;
}
}*/
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->epSet = *pEpSet; pCtx->epSet = *pEpSet;
pCtx->origEpSet = *pEpSet; pCtx->origEpSet = *pEpSet;
......
...@@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param); ...@@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param);
int32_t tQWorkerInit(SQWorkerPool *pool) { int32_t tQWorkerInit(SQWorkerPool *pool) {
pool->qset = taosOpenQset(); pool->qset = taosOpenQset();
pool->workers = taosMemoryCalloc(pool->max, sizeof(SQWorker)); pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker));
if (pool->workers == NULL) { if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -31,7 +31,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { ...@@ -31,7 +31,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
(void)taosThreadMutexInit(&pool->mutex, NULL); (void)taosThreadMutexInit(&pool->mutex, NULL);
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i; SQueueWorker *worker = pool->workers + i;
worker->id = i; worker->id = i;
worker->pool = pool; worker->pool = pool;
} }
...@@ -42,14 +42,14 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { ...@@ -42,14 +42,14 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
void tQWorkerCleanup(SQWorkerPool *pool) { void tQWorkerCleanup(SQWorkerPool *pool) {
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i; SQueueWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset); taosQsetThreadResume(pool->qset);
} }
} }
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i; SQueueWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id); uInfo("worker:%s:%d is stopping", pool->name, worker->id);
taosThreadJoin(worker->thread, NULL); taosThreadJoin(worker->thread, NULL);
...@@ -65,7 +65,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) { ...@@ -65,7 +65,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
uInfo("worker:%s is closed", pool->name); uInfo("worker:%s is closed", pool->name);
} }
static void *tQWorkerThreadFp(SQWorker *worker) { static void *tQWorkerThreadFp(SQueueWorker *worker) {
SQWorkerPool *pool = worker->pool; SQWorkerPool *pool = worker->pool;
SQueueInfo qinfo = {0}; SQueueInfo qinfo = {0};
void *msg = NULL; void *msg = NULL;
...@@ -106,7 +106,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { ...@@ -106,7 +106,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
// spawn a thread to process queue // spawn a thread to process queue
if (pool->num < pool->max) { if (pool->num < pool->max) {
do { do {
SQWorker *worker = pool->workers + pool->num; SQueueWorker *worker = pool->workers + pool->num;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); taosThreadAttrInit(&thAttr);
...@@ -138,7 +138,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { ...@@ -138,7 +138,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) { int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
pool->qset = taosOpenQset(); pool->qset = taosOpenQset();
pool->workers = taosArrayInit(2, sizeof(SQWorker *)); pool->workers = taosArrayInit(2, sizeof(SQueueWorker *));
if (pool->workers == NULL) { if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -153,14 +153,14 @@ int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) { ...@@ -153,14 +153,14 @@ int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) { void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
int32_t size = taosArrayGetSize(pool->workers); int32_t size = taosArrayGetSize(pool->workers);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SQWorker *worker = taosArrayGetP(pool->workers, i); SQueueWorker *worker = taosArrayGetP(pool->workers, i);
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset); taosQsetThreadResume(pool->qset);
} }
} }
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SQWorker *worker = taosArrayGetP(pool->workers, i); SQueueWorker *worker = taosArrayGetP(pool->workers, i);
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id); uInfo("worker:%s:%d is stopping", pool->name, worker->id);
taosThreadJoin(worker->thread, NULL); taosThreadJoin(worker->thread, NULL);
...@@ -177,7 +177,7 @@ void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) { ...@@ -177,7 +177,7 @@ void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
uInfo("worker:%s is closed", pool->name); uInfo("worker:%s is closed", pool->name);
} }
static void *tAutoQWorkerThreadFp(SQWorker *worker) { static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
SAutoQWorkerPool *pool = worker->pool; SAutoQWorkerPool *pool = worker->pool;
SQueueInfo qinfo = {0}; SQueueInfo qinfo = {0};
void *msg = NULL; void *msg = NULL;
...@@ -222,7 +222,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem ...@@ -222,7 +222,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
// spawn a thread to process queue // spawn a thread to process queue
while (curWorkerNum < dstWorkerNum) { while (curWorkerNum < dstWorkerNum) {
SQWorker *worker = taosMemoryCalloc(1, sizeof(SQWorker)); SQueueWorker *worker = taosMemoryCalloc(1, sizeof(SQueueWorker));
if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) { if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
uError("worker:%s:%d failed to create", pool->name, curWorkerNum); uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
taosMemoryFree(worker); taosMemoryFree(worker);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册