提交 7021d3f6 编写于 作者: H hjxilinx

refactor codes

上级 9e5ddfe4
...@@ -315,9 +315,9 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -315,9 +315,9 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo);
/* /*
* if the first column of the secondary query is not ts function, add this function. * if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting. * Because this column is required to filter with timestamp after intersecting.
*/ */
if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS) { if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS) {
tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
} }
...@@ -349,8 +349,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -349,8 +349,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
tscPrintSelectClause(pNew, 0); tscPrintSelectClause(pNew, 0);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, 0, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, pSql, pNew, 0, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name); pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
...@@ -391,7 +390,10 @@ static void doQuitSubquery(SSqlObj* pParentSql) { ...@@ -391,7 +390,10 @@ static void doQuitSubquery(SSqlObj* pParentSql) {
} }
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) {
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { int32_t numOfTotal = pSupporter->pState->numOfCompleted;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
if (finished >= numOfTotal) {
pSqlObj->res.code = abs(pSupporter->pState->code); pSqlObj->res.code = abs(pSupporter->pState->code);
tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code);
...@@ -897,7 +899,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { ...@@ -897,7 +899,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
pTSBuf->f = fopen(pTSBuf->path, "r+"); pTSBuf->f = fopen(pTSBuf->path, "r+");
if (pTSBuf->f == NULL) { if (pTSBuf->f == NULL) {
free(pTSBuf); free(pTSBuf);
return NULL; return NULL;
} }
......
...@@ -239,10 +239,19 @@ typedef struct SQuery { ...@@ -239,10 +239,19 @@ typedef struct SQuery {
int lfd; // only for query in file, last file handle int lfd; // only for query in file, last file handle
SCompBlock *pBlock; // only for query in file SCompBlock *pBlock; // only for query in file
SField ** pFields; SField ** pFields;
int numOfBlocks; // only for query in file int numOfBlocks; // only for query in file
int blockBufferSize; // length of pBlock buffer int blockBufferSize; // length of pBlock buffer
int currentSlot; int currentSlot;
int firstSlot; int firstSlot;
/*
* the two parameters are utilized to handle the data missing situation, caused by import operation.
* When the commit slot is the first slot, and commitPoints != 0
*/
int32_t commitSlot; // which slot is committed,
int32_t commitPoint; // starting point for next commit
int slot; int slot;
int pos; int pos;
TSKEY key; TSKEY key;
......
...@@ -1146,6 +1146,32 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE ...@@ -1146,6 +1146,32 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE
// keep the structure as well as the block data into local buffer // keep the structure as well as the block data into local buffer
memcpy(&pRuntimeEnv->cacheBlock, pBlock, sizeof(SCacheBlock)); memcpy(&pRuntimeEnv->cacheBlock, pBlock, sizeof(SCacheBlock));
// the commit data points will be ignored
int32_t offset = 0;
int32_t numOfPoints = pBlock->numOfPoints;
if (pQuery->firstSlot == pQuery->commitSlot) {
assert(pQuery->commitPoint >= 0 && pQuery->commitPoint <= pBlock->numOfPoints);
offset = pQuery->commitPoint;
numOfPoints = pBlock->numOfPoints - offset;
if (offset != 0) {
dTrace("%p ignore the data in cache block that are commit already, numOfblock:%d slot:%d ignore points:%d. "
"first:%d last:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->commitPoint,
pQuery->firstSlot, pQuery->currentSlot);
}
pBlock->numOfPoints = numOfPoints;
// current block are all commit already, ignore it
if (pBlock->numOfPoints == 0) {
dTrace("%p ignore current in cache block that are all commit already, numOfblock:%d slot:%d"
"first:%d last:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot,
pQuery->firstSlot, pQuery->currentSlot);
return NULL;
}
}
// keep the data from in cache into the temporarily allocated buffer // keep the data from in cache into the temporarily allocated buffer
for(int32_t i = 0; i < pQuery->numOfCols; ++i) { for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfoEx *pColumnInfoEx = &pQuery->colList[i]; SColumnInfoEx *pColumnInfoEx = &pQuery->colList[i];
...@@ -1164,9 +1190,9 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE ...@@ -1164,9 +1190,9 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE
assert(pCol->colId == pQuery->colList[i].data.colId && bytes == pColumnInfoEx->data.bytes && assert(pCol->colId == pQuery->colList[i].data.colId && bytes == pColumnInfoEx->data.bytes &&
type == pColumnInfoEx->data.type); type == pColumnInfoEx->data.type);
memcpy(dst, pBlock->offset[columnIndex], pBlock->numOfPoints * bytes); memcpy(dst, pBlock->offset[columnIndex] + offset * bytes, numOfPoints * bytes);
} else { } else {
setNullN(dst, type, bytes, pBlock->numOfPoints); setNullN(dst, type, bytes, numOfPoints);
} }
} }
...@@ -2500,18 +2526,24 @@ void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t v ...@@ -2500,18 +2526,24 @@ void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t v
// commitSlot here denotes the first uncommitted block in cache // commitSlot here denotes the first uncommitted block in cache
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t lastSlot = 0; int32_t lastSlot = 0;
int32_t commitSlot = 0;
int32_t commitPoint = 0;
SCachePool *pPool = (SCachePool *)vnodeList[vid].pCachePool; SCachePool *pPool = (SCachePool *)vnodeList[vid].pCachePool;
pthread_mutex_lock(&pPool->vmutex); pthread_mutex_lock(&pPool->vmutex);
numOfBlocks = pCacheInfo->numOfBlocks; numOfBlocks = pCacheInfo->numOfBlocks;
lastSlot = pCacheInfo->currentSlot; lastSlot = pCacheInfo->currentSlot;
commitSlot = pCacheInfo->commitSlot;
commitPoint = pCacheInfo->commitPoint;
pthread_mutex_unlock(&pPool->vmutex); pthread_mutex_unlock(&pPool->vmutex);
// make sure it is there, otherwise, return right away // make sure it is there, otherwise, return right away
pQuery->currentSlot = lastSlot; pQuery->currentSlot = lastSlot;
pQuery->numOfBlocks = numOfBlocks; pQuery->numOfBlocks = numOfBlocks;
pQuery->firstSlot = getFirstCacheSlot(numOfBlocks, lastSlot, pCacheInfo); pQuery->firstSlot = getFirstCacheSlot(numOfBlocks, lastSlot, pCacheInfo);
pQuery->commitSlot = commitSlot;
pQuery->commitPoint = commitPoint;
/* /*
* Note: the block id is continuous increasing, never becomes smaller. * Note: the block id is continuous increasing, never becomes smaller.
* *
...@@ -4437,7 +4469,7 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl ...@@ -4437,7 +4469,7 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
pSummary->fileTimeUs += (taosGetTimestampUs() - start); pSummary->fileTimeUs += (taosGetTimestampUs() - start);
} else { } else {
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true)); assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot); SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot);
*pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK); *pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册