未验证 提交 95aa47c4 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1291 from taosdata/feature/liaohj

avoid the retry efforts to consume too many CPU time in getting first…
......@@ -1157,24 +1157,22 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeE
offset = pQuery->commitPoint;
numOfPoints = pNewBlock->numOfPoints - offset;
if (offset != 0) {
dTrace(
"%p ignore the data in cache block that are commit already, numOfblock:%d slot:%d ignore points:%d. "
"first:%d last:%d",
GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->commitPoint, pQuery->firstSlot,
pQuery->currentSlot);
}
pNewBlock->numOfPoints = numOfPoints;
// current block are all commit already, ignore it
if (pNewBlock->numOfPoints == 0) {
dTrace(
"%p ignore current in cache block that are all commit already, numOfblock:%d slot:%d"
"first:%d last:%d",
GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->firstSlot, pQuery->currentSlot);
return NULL;
if (offset != 0) {
if (pNewBlock->numOfPoints > 0) {
dTrace("QInfo:%p ignore the data in cache block that are commit already, numOfBlock:%d slot:%d ignore points:%d "
"remain:%d first:%d last:%d",
GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->commitPoint, pNewBlock->numOfPoints,
pQuery->firstSlot, pQuery->currentSlot);
} else {
// current block are all commit already, ignore it
assert (pNewBlock->numOfPoints == 0);
dTrace("QInfo:%p ignore points in cache block that are all commit already, numOfBlock:%d slot:%d ignore points:%d "
"first:%d last:%d",
GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, slot, offset, pQuery->firstSlot, pQuery->currentSlot);
return NULL;
}
}
}
......@@ -3113,19 +3111,19 @@ static bool cacheBoundaryCheck(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterO
// earliest key in cache
TSKEY keyFirst = 0;
TSKEY keyLast = pMeterObj->lastKey;
// keep the value in local variable, since it may be changed by other thread any time
int32_t numOfBlocks = pCacheInfo->numOfBlocks;
int32_t currentSlot = pCacheInfo->currentSlot;
// no data in cache, return false directly
if (numOfBlocks == 0) {
return false;
}
int32_t first = getFirstCacheSlot(numOfBlocks, currentSlot, pCacheInfo);
while (1) {
// keep the value in local variable, since it may be changed by other thread any time
int32_t numOfBlocks = pCacheInfo->numOfBlocks;
int32_t currentSlot = pCacheInfo->currentSlot;
// no data in cache, return false directly
if (numOfBlocks == 0) {
return false;
}
int32_t first = getFirstCacheSlot(numOfBlocks, currentSlot, pCacheInfo);
/*
* pBlock may be null value since this block is flushed to disk, and re-distributes to
* other meter, so go on until we get the first not flushed cache block.
......@@ -3137,9 +3135,12 @@ static bool cacheBoundaryCheck(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterO
/*
* there may be only one empty cache block existed caused by import.
*/
if (numOfBlocks == 1) {
if (first == currentSlot || numOfBlocks == 1) {
return false;
}
// todo use defined macro
first = (first + 1 + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks;
}
}
......@@ -4950,7 +4951,10 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
*/
void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
STableQuerySupportObj *pSupporter = pQInfo->pTableQuerySupporter;
if (pSupporter != NULL) {
assert(pSupporter->numOfMeters >= 1);
}
if (pSupporter == NULL || pSupporter->numOfMeters == 1) {
atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1);
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册