From a27fcc52b8e9fcca4a0aace0e8fda9d917ab7905 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 29 Nov 2019 11:50:15 +0800 Subject: [PATCH] refactor part of code --- src/system/detail/src/vnodeImport.c | 101 ++++++++++++---------------- 1 file changed, 44 insertions(+), 57 deletions(-) diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index 4d1d671096..07dd7e237f 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -1257,6 +1257,37 @@ int isCacheEnd(SBlockIter iter, SMeterObj *pMeter) { return ((iter.slot == slot) && (iter.pos == pos)); } +static void vnodeFlushMergeBuffer(SMergeBuffer *pBuffer, SBlockIter *pWriteIter, SBlockIter *pCacheIter, + SMeterObj *pObj, SCacheInfo *pInfo, int checkBound) { + // Function to flush the merge buffer data to cache + if (pWriteIter->pos == pObj->pointsPerBlock) { + pWriteIter->pos = 0; + pWriteIter->slot = (pWriteIter->slot + 1) % pInfo->maxBlocks; + } + + while (pBuffer->spos != pBuffer->epos) { + if (checkBound && pWriteIter->slot == pCacheIter->slot && pWriteIter->pos == pCacheIter->pos) break; + for (int col = 0; col < pObj->numOfColumns; col++) { + memcpy(pInfo->cacheBlocks[pWriteIter->slot]->offset[col] + pObj->schema[col].bytes * pWriteIter->pos, + pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes); + } + + if (pWriteIter->pos + 1 < pObj->pointsPerBlock) { + (pWriteIter->pos)++; + } else { + pInfo->cacheBlocks[pWriteIter->slot]->numOfPoints = pWriteIter->pos + 1; + pWriteIter->slot = (pWriteIter->slot + 1) % pInfo->maxBlocks; + pWriteIter->pos = 0; + } + + pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows; + } + + if ((!checkBound) && pWriteIter->pos != 0) { + pInfo->cacheBlocks[pWriteIter->slot]->numOfPoints = pWriteIter->pos; + } +} + int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int rows) { SMeterObj * pObj = pImport->pObj; SVnodeObj * pVnode = vnodeList + pObj->vnode; @@ -1353,35 +1384,13 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int if ((payloadIter >= rows) && isCacheIterEnd) break; if ((pBuffer->epos + 1) % pBuffer->totalRows == pBuffer->spos) { // merge buffer is full, flush - if (writeIter.pos == pObj->pointsPerBlock) { - writeIter.pos = 0; - writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks; - } - - while (pBuffer->spos != pBuffer->epos) { - if (writeIter.slot == cacheIter.slot && writeIter.pos == cacheIter.pos) break; - for (int col = 0; col < pObj->numOfColumns; col++) { - memcpy(pInfo->cacheBlocks[writeIter.slot]->offset[col] + pObj->schema[col].bytes * writeIter.pos, - pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes); - } - - if (writeIter.pos + 1 < pObj->pointsPerBlock) { - writeIter.pos++; - } else { - pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos + 1; - writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks; - writeIter.pos = 0; - } - - pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows; - } + vnodeFlushMergeBuffer(pBuffer, &writeIter, &cacheIter, pObj, pInfo, 1); } - if ((payloadIter >= rows) || - ((!isCacheIterEnd) && - (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) > - KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY), - cacheIter.pos)))) { // if (payload end || (cacheIter not end && payloadKey > blockKey)), consume cache + TSKEY payloadKey = (payloadIter < rows) ? KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) : INT64_MAX; + TSKEY cacheKey = (isCacheIterEnd) ? INT64_MAX : KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY), cacheIter.pos); + + if (cacheKey < payloadKey) { // if (payload end || (cacheIter not end && payloadKey > blockKey)), consume cache for (int col = 0; col < pObj->numOfColumns; col++) { memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos, pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos, @@ -1389,11 +1398,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int } FORWARD_ITER(cacheIter, 1, pInfo->maxBlocks, pObj->pointsPerBlock); isCacheIterEnd = isCacheEnd(cacheIter, pObj); - } else if ((isCacheIterEnd) || - ((payloadIter < rows) && - (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < - KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY), - cacheIter.pos)))) { // cacheIter end || (payloadIter not end && payloadKey < blockKey), consume payload + } else if (cacheKey > payloadKey) { // cacheIter end || (payloadIter not end && payloadKey < blockKey), consume payload if (availPoints == 0) { // Need to allocate a new cache block pthread_mutex_lock(&(pPool->vmutex)); // TODO: Need to check if there are enough slots to hold a new one @@ -1482,29 +1487,11 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int pBuffer->epos = (pBuffer->epos + 1) % pBuffer->totalRows; } - if (pBuffer->spos != pBuffer->epos) { - if (writeIter.pos == pObj->pointsPerBlock) { - writeIter.pos = 0; - writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks; - } - while (pBuffer->spos != pBuffer->epos) { - for (int col = 0; col < pObj->numOfColumns; col++) { - memcpy(pInfo->cacheBlocks[writeIter.slot]->offset[col] + pObj->schema[col].bytes * writeIter.pos, - pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes); - } - - if (writeIter.pos + 1 < pObj->pointsPerBlock) { - writeIter.pos++; - } else { - pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos + 1; - writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks; - writeIter.pos = 0; - } - - pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows; - } - - if (writeIter.pos != 0) pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos; + if (pBuffer->spos != pBuffer->epos) { // Flush the remaining data in the merge buffer + vnodeFlushMergeBuffer(pBuffer, &writeIter, &cacheIter, pObj, pInfo, 0); + } else { + // Should never come here + assert(false); } if (isAppendData) { @@ -1514,9 +1501,9 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int } } pImport->importedRows += rowsImported; - __sync_fetch_and_sub(&(pObj->freePoints), rowsImported); + atomic_fetch_sub_32(&(pObj->freePoints), rowsImported); - code = 0; + code = TSDB_CODE_SUCCESS; _exit: tfree(pBuffer); -- GitLab