提交 3d0d6771 编写于 作者: A AlexDuan

longquery push

上级 f8e5a3a7
......@@ -76,6 +76,9 @@ void* qGetResultRetrieveMsg(qinfo_t qinfo);
*/
int32_t qKillQuery(qinfo_t qinfo);
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount);
int32_t qQueryCompleted(qinfo_t qinfo);
/**
......@@ -94,6 +97,15 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
bool checkQIdEqual(void *qHandle, uint64_t qId);
int64_t genQueryId(void);
// util
typedef struct {
int64_t qId;
int32_t timeMs;
} SLongQuery;
// return SArray* include SLongQuery*
void* qObtainLongQuery(void* qMgmt, int32_t longMs);
#ifdef __cplusplus
}
#endif
......
......@@ -35,6 +35,7 @@ int32_t* taosGetErrno();
#define terrno (*taosGetErrno())
#define TSDB_CODE_SUCCESS 0
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
// rpc
#define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) //"Action in progress")
......
......@@ -192,6 +192,7 @@ typedef struct {
SList * bufBlockList;
int64_t pointsAdd; // TODO
int64_t storageAdd; // TODO
int64_t commitedMs; // commited ms time , zero is no commit.
} SMemTable;
typedef struct {
......
......@@ -88,6 +88,9 @@ int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qt
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
// util
void* vnodeGetqMgmt(void* pVnode);
#ifdef __cplusplus
}
#endif
......
......@@ -219,6 +219,9 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
int waitMoment(SQInfo* pQInfo){
if(pQInfo->sql) {
int ms = 0;
char* pcnt = strstr(pQInfo->sql, " count(*)");
if(pcnt) return 0;
char* pos = strstr(pQInfo->sql, " t_");
if(pos){
pos += 3;
......@@ -604,3 +607,87 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) {
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle);
return 0;
}
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
int32_t error = TSDB_CODE_SUCCESS;
void** handle = qAcquireQInfo(pMgmt, qId);
if(handle == NULL) return terrno;
SQInfo* pQInfo = (SQInfo*)(*handle);
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("QInfo:0x%"PRIx64" query killed by qid.", pQInfo->qId);
setQueryKilled(pQInfo);
// wait query stop
int32_t loop = 0;
while (pQInfo->owner != 0) {
taosMsleep(waitMs);
if(loop++ > waitCount){
error = TSDB_CODE_FAILED;
break;
}
}
return error;
}
int compareLongQuery(const void* p1, const void* p2) {
// sort desc
SLongQuery* plq1 = (SLongQuery*)p1;
SLongQuery* plq2 = (SLongQuery*)p2;
if(plq1->timeMs == plq2->timeMs) {
return 0;
} else if(plq1->timeMs > plq2->timeMs) {
return -1;
} else {
return 1;
}
}
// util
void* qObtainLongQuery(void* param, int32_t longMs){
SQueryMgmt* qMgmt = (SQueryMgmt*)param;
if(qMgmt == NULL || qMgmt->qinfoPool == NULL) return NULL;
SArray* qids = taosArrayInit(4, sizeof(int64_t*));
SHashObj* pHashTable = qMgmt->qinfoPool->pHashTable;
if(pHashTable == NULL || pHashTable->hashList == NULL) return NULL;
SQInfo * qInfo = (SQInfo*)taosHashIterate(pHashTable, NULL);
while(qInfo){
// judge long query
SMemTable* imem = qInfo->runtimeEnv.pQueryAttr->memRef.snapshot.imem;
if(imem == NULL || imem->commitedMs == 0) continue;
int64_t now = taosGetTimestampMs();
if(imem->commitedMs > now) continue; // weird, so skip
int32_t passMs = now - imem->commitedMs;
if(passMs < longMs) {
continue;
}
// push
SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery));
plq->timeMs = passMs;
plq->qId = qInfo->qId;
taosArrayPush(qids, plq);
// next
qInfo = (SQInfo*)taosHashIterate(pHashTable, qInfo);
}
size_t cnt = taosArrayGetSize(qids);
if(cnt == 0) {
taosArrayDestroyEx(qids, free);
return NULL;
}
if(cnt > 1) {
taosArraySort(qids, compareLongQuery);
}
return qids;
}
\ No newline at end of file
......@@ -43,4 +43,8 @@ SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode);
// health cite
STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
#endif /* _TD_TSDB_BUFFER_H_ */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_HEALTH_H_
#define _TD_TSDB_HEALTH_H_
bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
bool enoughIdleMemory();
bool allowNewBlock(STsdbRepo* pRepo);
#endif /* _TD_TSDB_BUFFER_H_ */
......@@ -14,12 +14,10 @@
*/
#include "tsdbint.h"
#include "tsdbHealth.h"
#define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0)
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
// ---------------- INTERNAL FUNCTIONS ----------------
STsdbBufPool *tsdbNewBufPool() {
STsdbBufPool *pBufPool = (STsdbBufPool *)calloc(1, sizeof(*pBufPool));
......@@ -120,6 +118,14 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
STsdbBufPool *pBufPool = pRepo->pPool;
while (POOL_IS_EMPTY(pBufPool)) {
// supply new Block
if(tsdbInsertNewBlock(pRepo) > 0) {
break;
} else {
// no newBlock, kill query free
tsdbUrgeQueryFree(pRepo);
}
pRepo->repoLocked = false;
pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex));
pRepo->repoLocked = true;
......@@ -139,7 +145,7 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
}
// ---------------- LOCAL FUNCTIONS ----------------
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize);
if (pBufBlock == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
......@@ -157,7 +163,7 @@ _err:
return NULL;
}
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
if (oldTotalBlocks == pRepo->config.totalBlocks) {
......@@ -199,4 +205,4 @@ void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) {
tsdbFreeBufBlock(pBufBlock);
free(pNode);
pPool->nBufBlocks--;
}
}
\ No newline at end of file
......@@ -547,6 +547,8 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
(void)tsdbLockRepo(pRepo);
pRepo->imem = NULL;
(void)tsdbUnlockRepo(pRepo);
//save commited time
pIMem->commitedMs = taosGetTimestampMs();
tsdbUnRefMemTable(pRepo, pIMem);
tsem_post(&(pRepo->readyToCommit));
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "tarray.h"
#include "query.h"
#include "tglobal.h"
#include "tsdbint.h"
#include "tsdbBuffer.h"
#include "tsdbLog.h"
#include "tsdbHealth.h"
#include "tsdbint.h"
#include "ttimer.h"
#include "vnode.h"
// return malloc new block count
int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
STsdbBufPool *pPool = pRepo->pPool;
int32_t cnt = 0;
if(enoughIdleMemory() && allowNewBlock(pRepo)) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock) {
if (tsdbLockRepo(pRepo) >= 0) {
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
// append error
tsdbFreeBufBlock(pBufBlock);
} else {
pPool->nRecycleBlocks ++;
cnt ++ ;
}
tsdbUnlockRepo(pRepo);
}
}
}
return cnt;
}
// switch anther thread to run
void cbKillQueryFree(void* param1, void* param2) {
STsdbRepo* pRepo = (STsdbRepo*)param1;
int32_t longMs = 2000; // TODO config to taos.cfg
// vnode
void* vnodeObj = pRepo->appH.appH;
if(vnodeObj == NULL) return ;
// qMgmt
void* qMgmt = vnodeGetqMgmt(vnodeObj);
if(qMgmt == NULL) return ;
// qid top list
SArray *qids = (SArray*)qObtainLongQuery(qMgmt, longMs);
if(qids == NULL) return ;
// kill Query
size_t cnt = taosArrayGetSize(qids);
int64_t qId = 0;
for(size_t i=0; i < cnt; i++) {
qId = *(int64_t*)taosArrayGetP(qids, i);
qKillQueryByQId(qMgmt, qId, 100, 50); // wait 50*100 ms
// notify wait
pthread_cond_signal(&pRepo->pPool->poolNotEmpty);
// check break condition
if(enoughIdleMemory() && allowNewBlock(pRepo)) {
break;
}
}
// free qids
taosArrayDestroyEx(qids, free);
}
// return true do free , false do nothing
bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
// 1 start timer
tmr_h hTimer = taosTmrStart(cbKillQueryFree, 1, pRepo, NULL);
return hTimer != NULL;
}
bool enoughIdleMemory(){
// TODO config to taos.cfg
int32_t lowestRate = 20; // below 20% idle memory, return not enough memory
float memoryUsedMB = 0;
float memoryAvailMB;
if (true != taosGetSysMemory(&memoryUsedMB)) {
tsdbWarn("tsdbHealth get memory error, return false.");
return false;
}
if(memoryUsedMB > tsTotalMemoryMB || tsTotalMemoryMB == 0) {
tsdbWarn("tsdbHealth used memory(%d MB) large total memory(%d MB), return false.", (int)memoryUsedMB, (int)tsTotalMemoryMB);
return false;
}
memoryAvailMB = (float)tsTotalMemoryMB - memoryUsedMB;
int32_t rate = (int32_t)(memoryAvailMB/tsTotalMemoryMB * 100);
if(rate < lowestRate){
tsdbWarn("tsdbHealth real rate :%d less than lowest rate:%d, so return false.", rate, lowestRate);
return false;
}
return true;
}
bool allowNewBlock(STsdbRepo* pRepo){
//TODO config to taos.cfg
int32_t nElasticBlocks = 10;
STsdbBufPool* pPool = pRepo->pPool;
int32_t nOverBlocks = pPool->nBufBlocks - pRepo->config.totalBlocks;
if(nOverBlocks > nElasticBlocks) {
tsdbWarn("tsdbHealth allowNewBlock forbid. nOverBlocks(%d) > nElasticBlocks(%d)", nOverBlocks, nElasticBlocks);
return false;
}
return true;
}
......@@ -562,3 +562,9 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
return 0;
}
// get qmgmt
void* vnodeGetqMgmt(void* pVnode){
if(pVnode == NULL) return NULL;
return ((SVnodeObj*)pVnode)->qMgmt;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册