提交 a27fcc52 编写于 作者: H Hongze Cheng

refactor part of code

上级 56bb3121
......@@ -1257,6 +1257,37 @@ int isCacheEnd(SBlockIter iter, SMeterObj *pMeter) {
return ((iter.slot == slot) && (iter.pos == pos));
}
static void vnodeFlushMergeBuffer(SMergeBuffer *pBuffer, SBlockIter *pWriteIter, SBlockIter *pCacheIter,
SMeterObj *pObj, SCacheInfo *pInfo, int checkBound) {
// Function to flush the merge buffer data to cache
if (pWriteIter->pos == pObj->pointsPerBlock) {
pWriteIter->pos = 0;
pWriteIter->slot = (pWriteIter->slot + 1) % pInfo->maxBlocks;
}
while (pBuffer->spos != pBuffer->epos) {
if (checkBound && pWriteIter->slot == pCacheIter->slot && pWriteIter->pos == pCacheIter->pos) break;
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pInfo->cacheBlocks[pWriteIter->slot]->offset[col] + pObj->schema[col].bytes * pWriteIter->pos,
pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes);
}
if (pWriteIter->pos + 1 < pObj->pointsPerBlock) {
(pWriteIter->pos)++;
} else {
pInfo->cacheBlocks[pWriteIter->slot]->numOfPoints = pWriteIter->pos + 1;
pWriteIter->slot = (pWriteIter->slot + 1) % pInfo->maxBlocks;
pWriteIter->pos = 0;
}
pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows;
}
if ((!checkBound) && pWriteIter->pos != 0) {
pInfo->cacheBlocks[pWriteIter->slot]->numOfPoints = pWriteIter->pos;
}
}
int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int rows) {
SMeterObj * pObj = pImport->pObj;
SVnodeObj * pVnode = vnodeList + pObj->vnode;
......@@ -1353,35 +1384,13 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
if ((payloadIter >= rows) && isCacheIterEnd) break;
if ((pBuffer->epos + 1) % pBuffer->totalRows == pBuffer->spos) { // merge buffer is full, flush
if (writeIter.pos == pObj->pointsPerBlock) {
writeIter.pos = 0;
writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks;
}
while (pBuffer->spos != pBuffer->epos) {
if (writeIter.slot == cacheIter.slot && writeIter.pos == cacheIter.pos) break;
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pInfo->cacheBlocks[writeIter.slot]->offset[col] + pObj->schema[col].bytes * writeIter.pos,
pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes);
}
if (writeIter.pos + 1 < pObj->pointsPerBlock) {
writeIter.pos++;
} else {
pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos + 1;
writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks;
writeIter.pos = 0;
}
pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows;
}
vnodeFlushMergeBuffer(pBuffer, &writeIter, &cacheIter, pObj, pInfo, 1);
}
if ((payloadIter >= rows) ||
((!isCacheIterEnd) &&
(KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >
KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY),
cacheIter.pos)))) { // if (payload end || (cacheIter not end && payloadKey > blockKey)), consume cache
TSKEY payloadKey = (payloadIter < rows) ? KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) : INT64_MAX;
TSKEY cacheKey = (isCacheIterEnd) ? INT64_MAX : KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY), cacheIter.pos);
if (cacheKey < payloadKey) { // if (payload end || (cacheIter not end && payloadKey > blockKey)), consume cache
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos,
pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos,
......@@ -1389,11 +1398,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
}
FORWARD_ITER(cacheIter, 1, pInfo->maxBlocks, pObj->pointsPerBlock);
isCacheIterEnd = isCacheEnd(cacheIter, pObj);
} else if ((isCacheIterEnd) ||
((payloadIter < rows) &&
(KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) <
KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY),
cacheIter.pos)))) { // cacheIter end || (payloadIter not end && payloadKey < blockKey), consume payload
} else if (cacheKey > payloadKey) { // cacheIter end || (payloadIter not end && payloadKey < blockKey), consume payload
if (availPoints == 0) { // Need to allocate a new cache block
pthread_mutex_lock(&(pPool->vmutex));
// TODO: Need to check if there are enough slots to hold a new one
......@@ -1482,29 +1487,11 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
pBuffer->epos = (pBuffer->epos + 1) % pBuffer->totalRows;
}
if (pBuffer->spos != pBuffer->epos) {
if (writeIter.pos == pObj->pointsPerBlock) {
writeIter.pos = 0;
writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks;
}
while (pBuffer->spos != pBuffer->epos) {
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pInfo->cacheBlocks[writeIter.slot]->offset[col] + pObj->schema[col].bytes * writeIter.pos,
pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes);
}
if (writeIter.pos + 1 < pObj->pointsPerBlock) {
writeIter.pos++;
} else {
pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos + 1;
writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks;
writeIter.pos = 0;
}
pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows;
}
if (writeIter.pos != 0) pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos;
if (pBuffer->spos != pBuffer->epos) { // Flush the remaining data in the merge buffer
vnodeFlushMergeBuffer(pBuffer, &writeIter, &cacheIter, pObj, pInfo, 0);
} else {
// Should never come here
assert(false);
}
if (isAppendData) {
......@@ -1514,9 +1501,9 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
}
}
pImport->importedRows += rowsImported;
__sync_fetch_and_sub(&(pObj->freePoints), rowsImported);
atomic_fetch_sub_32(&(pObj->freePoints), rowsImported);
code = 0;
code = TSDB_CODE_SUCCESS;
_exit:
tfree(pBuffer);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册