#include "qtsbuf.h" #include "tscompression.h" #include "tutil.h" #include "taoserror.h" static int32_t getDataStartOffset(); static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo); static STSBuf* allocResForTSBuf(STSBuf* pTSBuf); static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); /** * todo error handling * support auto closeable tmp file * @param path * @return */ STSBuf* tsBufCreate(bool autoDelete) { STSBuf* pTSBuf = calloc(1, sizeof(STSBuf)); if (pTSBuf == NULL) { return NULL; } getTmpfilePath("join", pTSBuf->path); pTSBuf->f = fopen(pTSBuf->path, "w+"); if (pTSBuf->f == NULL) { free(pTSBuf); return NULL; } if (NULL == allocResForTSBuf(pTSBuf)) { return NULL; } // update the header info STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = TSDB_ORDER_ASC}; STSBufUpdateHeader(pTSBuf, &header); tsBufResetPos(pTSBuf); pTSBuf->cur.order = TSDB_ORDER_ASC; pTSBuf->autoDelete = autoDelete; pTSBuf->tsOrder = -1; return pTSBuf; } STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { STSBuf* pTSBuf = calloc(1, sizeof(STSBuf)); if (pTSBuf == NULL) { return NULL; } strncpy(pTSBuf->path, path, PATH_MAX); pTSBuf->f = fopen(pTSBuf->path, "r+"); if (pTSBuf->f == NULL) { free(pTSBuf); return NULL; } if (allocResForTSBuf(pTSBuf) == NULL) { return NULL; } // validate the file magic number STSBufFileHeader header = {0}; fseek(pTSBuf->f, 0, SEEK_SET); fread(&header, 1, sizeof(header), pTSBuf->f); // invalid file if (header.magic != TS_COMP_FILE_MAGIC) { return NULL; } if (header.numOfVnode > pTSBuf->numOfAlloc) { pTSBuf->numOfAlloc = header.numOfVnode; STSVnodeBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * pTSBuf->numOfAlloc); if (tmp == NULL) { tsBufDestory(pTSBuf); return NULL; } pTSBuf->pData = tmp; } pTSBuf->numOfVnodes = header.numOfVnode; // check the ts order pTSBuf->tsOrder = header.tsOrder; if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) { // tscError("invalid order info in buf:%d", pTSBuf->tsOrder); tsBufDestory(pTSBuf); return NULL; } size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes; STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize); //int64_t pos = ftell(pTSBuf->f); //pos not used fread(buf, infoSize, 1, pTSBuf->f); // the length value for each vnode is not kept in file, so does not set the length value for (int32_t i = 0; i < pTSBuf->numOfVnodes; ++i) { STSVnodeBlockInfoEx* pBlockList = &pTSBuf->pData[i]; memcpy(&pBlockList->info, &buf[i], sizeof(STSVnodeBlockInfo)); } free(buf); fseek(pTSBuf->f, 0, SEEK_END); struct stat fileStat; fstat(fileno(pTSBuf->f), &fileStat); pTSBuf->fileSize = (uint32_t)fileStat.st_size; tsBufResetPos(pTSBuf); // ascending by default pTSBuf->cur.order = TSDB_ORDER_ASC; pTSBuf->autoDelete = autoDelete; // tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f), // pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete); return pTSBuf; } void* tsBufDestory(STSBuf* pTSBuf) { if (pTSBuf == NULL) { return NULL; } tfree(pTSBuf->assistBuf); tfree(pTSBuf->tsData.rawBuf); tfree(pTSBuf->pData); tfree(pTSBuf->block.payload); fclose(pTSBuf->f); if (pTSBuf->autoDelete) { // ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); unlink(pTSBuf->path); } else { // tscTrace("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); } free(pTSBuf); return NULL; } static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) { int32_t last = pTSBuf->numOfVnodes - 1; assert(last >= 0); return &pTSBuf->pData[last]; } static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { if (pTSBuf->numOfAlloc <= pTSBuf->numOfVnodes) { uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5); assert(newSize > pTSBuf->numOfAlloc); STSVnodeBlockInfoEx* tmp = (STSVnodeBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize); if (tmp == NULL) { return NULL; } pTSBuf->pData = tmp; pTSBuf->numOfAlloc = newSize; memset(&pTSBuf->pData[pTSBuf->numOfVnodes], 0, sizeof(STSVnodeBlockInfoEx) * (newSize - pTSBuf->numOfVnodes)); } if (pTSBuf->numOfVnodes > 0) { STSVnodeBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); // update prev vnode length info in file TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pPrevBlockInfoEx->info); } // set initial value for vnode block STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes].info; pBlockInfo->vnode = vnodeId; pBlockInfo->offset = pTSBuf->fileSize; assert(pBlockInfo->offset >= getDataStartOffset()); // update vnode info in file TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes, pBlockInfo); // add one vnode info pTSBuf->numOfVnodes += 1; // update the header info STSBufFileHeader header = { .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; STSBufUpdateHeader(pTSBuf, &header); return tsBufGetLastVnodeInfo(pTSBuf); } static void shrinkBuffer(STSList* ptsData) { // shrink tmp buffer size if it consumes too many memory compared to the pre-defined size if (ptsData->allocSize >= ptsData->threshold * 2) { ptsData->rawBuf = realloc(ptsData->rawBuf, MEM_BUF_SIZE); ptsData->allocSize = MEM_BUF_SIZE; } } static void writeDataToDisk(STSBuf* pTSBuf) { if (pTSBuf->tsData.len == 0) { return; } STSBlock* pBlock = &pTSBuf->block; pBlock->numOfElem = pTSBuf->tsData.len / TSDB_KEYSIZE; pBlock->compLen = tsCompressTimestamp(pTSBuf->tsData.rawBuf, pTSBuf->tsData.len, pTSBuf->tsData.len / TSDB_KEYSIZE, pBlock->payload, pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET); UNUSED(r); /* * format for output data: * 1. tags, number of ts, size after compressed, payload, size after compressed * 2. tags, number of ts, size after compressed, payload, size after compressed * * both side has the compressed length is used to support load data forwards/backwords. */ fwrite(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; pTSBuf->fileSize += blockSize; pTSBuf->tsData.len = 0; STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); pVnodeBlockInfoEx->info.compLen += blockSize; pVnodeBlockInfoEx->info.numOfBlocks += 1; shrinkBuffer(&pTSBuf->tsData); } static void expandBuffer(STSList* ptsData, int32_t inputSize) { if (ptsData->allocSize - ptsData->len < inputSize) { int32_t newSize = inputSize + ptsData->len; char* tmp = realloc(ptsData->rawBuf, (size_t)newSize); if (tmp == NULL) { // todo } ptsData->rawBuf = tmp; ptsData->allocSize = newSize; } } STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { STSBlock* pBlock = &pTSBuf->block; // clear the memory buffer void* tmp = pBlock->payload; memset(pBlock, 0, sizeof(STSBlock)); pBlock->payload = tmp; if (order == TSDB_ORDER_DESC) { /* * set the right position for the reversed traverse, the reversed traverse is started from * the end of each comp data block */ fseek(pTSBuf->f, -sizeof(pBlock->padding), SEEK_CUR); fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f); pBlock->compLen = pBlock->padding; int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); fseek(pTSBuf->f, -offset, SEEK_CUR); } fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); if (decomp) { pTSBuf->tsData.len = tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); } // read the comp length at the length of comp block fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f); // for backwards traverse, set the start position at the end of previous block if (order == TSDB_ORDER_DESC) { int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); int64_t r = fseek(pTSBuf->f, -offset, SEEK_CUR); UNUSED(r); } return pBlock; } // set the order of ts buffer if the ts order has not been set yet static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { STSList* ptsData = &pTSBuf->tsData; if (pTSBuf->tsOrder == -1) { if (ptsData->len > 0) { TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE); if (lastKey > *(TSKEY*)pData) { pTSBuf->tsOrder = TSDB_ORDER_DESC; } else { pTSBuf->tsOrder = TSDB_ORDER_ASC; } } else if (len > TSDB_KEYSIZE) { // no data in current vnode, more than one ts is added, check the orders TSKEY k1 = *(TSKEY*)(pData); TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE); if (k1 < k2) { pTSBuf->tsOrder = TSDB_ORDER_ASC; } else if (k1 > k2) { pTSBuf->tsOrder = TSDB_ORDER_DESC; } else { // todo handle error } } } else { // todo the timestamp order is set, check the asc/desc order of appended data } return TSDB_CODE_SUCCESS; } void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len) { STSVnodeBlockInfoEx* pBlockInfo = NULL; STSList* ptsData = &pTSBuf->tsData; if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) { writeDataToDisk(pTSBuf); shrinkBuffer(ptsData); pBlockInfo = addOneVnodeInfo(pTSBuf, vnodeId); } else { pBlockInfo = tsBufGetLastVnodeInfo(pTSBuf); } assert(pBlockInfo->info.vnode == vnodeId); if (pTSBuf->block.tag != tag && ptsData->len > 0) { // new arrived data with different tags value, save current value into disk first writeDataToDisk(pTSBuf); } else { expandBuffer(ptsData, len); } pTSBuf->block.tag = tag; memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); // todo check return value setCheckTSOrder(pTSBuf, pData, len); ptsData->len += len; pBlockInfo->len += len; pTSBuf->numOfTotal += len / TSDB_KEYSIZE; // the size of raw data exceeds the size of the default prepared buffer, so // during getBufBlock, the output buffer needs to be large enough. if (ptsData->len >= ptsData->threshold) { writeDataToDisk(pTSBuf); shrinkBuffer(ptsData); } tsBufResetPos(pTSBuf); } void tsBufFlush(STSBuf* pTSBuf) { if (pTSBuf->tsData.len <= 0) { return; } writeDataToDisk(pTSBuf); shrinkBuffer(&pTSBuf->tsData); STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); // update prev vnode length info in file TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pBlockInfoEx->info); // save the ts order into header STSBufFileHeader header = { .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; STSBufUpdateHeader(pTSBuf, &header); fsync(fileno(pTSBuf->f)); } static int32_t tsBufFindVnodeIndexFromId(STSVnodeBlockInfoEx* pVnodeInfoEx, int32_t numOfVnodes, int32_t vnodeId) { int32_t j = -1; for (int32_t i = 0; i < numOfVnodes; ++i) { if (pVnodeInfoEx[i].info.vnode == vnodeId) { j = i; break; } } return j; } // todo opt performance by cache blocks info static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int32_t blockIndex) { if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) { return -1; } // sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode int32_t i = 0; bool decomp = false; while ((i++) <= blockIndex) { if (readDataFromDisk(pTSBuf, TSDB_ORDER_ASC, decomp) == NULL) { return -1; } } // set the file position to be the end of previous comp block if (pTSBuf->cur.order == TSDB_ORDER_DESC) { STSBlock* pBlock = &pTSBuf->block; int32_t compBlockSize = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); fseek(pTSBuf->f, -compBlockSize, SEEK_CUR); } return 0; } static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int64_t tag) { bool decomp = false; int64_t offset = 0; if (pTSBuf->cur.order == TSDB_ORDER_ASC) { offset = pBlockInfo->offset; } else { // reversed traverse starts from the end of block offset = pBlockInfo->offset + pBlockInfo->compLen; } if (fseek(pTSBuf->f, offset, SEEK_SET) != 0) { return -1; } for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) { if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) { return -1; } if (pTSBuf->block.tag == tag) { return i; } } return -1; } static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex) { STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[vnodeIndex].info; if (pBlockInfo->numOfBlocks <= blockIndex) { assert(false); } STSCursor* pCur = &pTSBuf->cur; if (pCur->vgroupIndex == vnodeIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) || (pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) { int32_t i = 0; bool decomp = false; int32_t step = abs(blockIndex - pCur->blockIndex); while ((++i) <= step) { if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) { return; } } } else { if (tsBufFindBlock(pTSBuf, pBlockInfo, blockIndex) == -1) { assert(false); } } STSBlock* pBlock = &pTSBuf->block; size_t s = pBlock->numOfElem * TSDB_KEYSIZE; /* * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function */ if (s > pTSBuf->tsData.allocSize) { expandBuffer(&pTSBuf->tsData, s); } pTSBuf->tsData.len = tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len)); pCur->vgroupIndex = vnodeIndex; pCur->blockIndex = blockIndex; pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1; } STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) { int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId); if (j == -1) { return NULL; } return &pTSBuf->pData[j].info; } int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) { if ((pTSBuf->f == NULL) || pHeader == NULL || pHeader->numOfVnode < 0 || pHeader->magic != TS_COMP_FILE_MAGIC) { return -1; } int64_t r = fseek(pTSBuf->f, 0, SEEK_SET); if (r != 0) { return -1; } fwrite(pHeader, sizeof(STSBufFileHeader), 1, pTSBuf->f); return 0; } bool tsBufNextPos(STSBuf* pTSBuf) { if (pTSBuf == NULL || pTSBuf->numOfVnodes == 0) { return false; } STSCursor* pCur = &pTSBuf->cur; // get the first/last position according to traverse order if (pCur->vgroupIndex == -1) { if (pCur->order == TSDB_ORDER_ASC) { tsBufGetBlock(pTSBuf, 0, 0); if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return tsBufResetPos(pTSBuf); return false; } else { return true; } } else { // get the last timestamp record in the last block of the last vnode assert(pTSBuf->numOfVnodes > 0); int32_t vnodeIndex = pTSBuf->numOfVnodes - 1; pCur->vgroupIndex = vnodeIndex; int32_t vnodeId = pTSBuf->pData[pCur->vgroupIndex].info.vnode; STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); int32_t blockIndex = pBlockInfo->numOfBlocks - 1; tsBufGetBlock(pTSBuf, vnodeIndex, blockIndex); pCur->tsIndex = pTSBuf->block.numOfElem - 1; if (pTSBuf->block.numOfElem == 0) { tsBufResetPos(pTSBuf); return false; } else { return true; } } } int32_t step = pCur->order == TSDB_ORDER_ASC ? 1 : -1; while (1) { assert(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE); if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) || (pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) { int32_t vnodeId = pTSBuf->pData[pCur->vgroupIndex].info.vnode; STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) || (pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) { if ((pCur->vgroupIndex >= pTSBuf->numOfVnodes - 1 && pCur->order == TSDB_ORDER_ASC) || (pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) { pCur->vgroupIndex = -1; return false; } if (pBlockInfo == NULL) { return false; } int32_t blockIndex = pCur->order == TSDB_ORDER_ASC ? 0 : pBlockInfo->numOfBlocks - 1; tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex); break; } else { tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex + step); break; } } else { pCur->tsIndex += step; break; } } return true; } void tsBufResetPos(STSBuf* pTSBuf) { if (pTSBuf == NULL) { return; } pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vgroupIndex = -1, .order = pTSBuf->cur.order}; } STSElem tsBufGetElem(STSBuf* pTSBuf) { STSElem elem1 = {.vnode = -1}; if (pTSBuf == NULL) { return elem1; } STSCursor* pCur = &pTSBuf->cur; if (pCur != NULL && pCur->vgroupIndex < 0) { return elem1; } STSBlock* pBlock = &pTSBuf->block; elem1.vnode = pTSBuf->pData[pCur->vgroupIndex].info.vnode; elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); elem1.tag = pBlock->tag; return elem1; } /** * current only support ts comp data from two vnode merge * @param pDestBuf * @param pSrcBuf * @param vnodeId * @return */ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfVnodes <= 0) { return 0; } if (pDestBuf->numOfVnodes + pSrcBuf->numOfVnodes > TS_COMP_FILE_VNODE_MAX) { return -1; } // src can only have one vnode index if (pSrcBuf->numOfVnodes > 1) { return -1; } // there are data in buffer, flush to disk first tsBufFlush(pDestBuf); // compared with the last vnode id if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) { int32_t oldSize = pDestBuf->numOfVnodes; int32_t newSize = oldSize + pSrcBuf->numOfVnodes; if (pDestBuf->numOfAlloc < newSize) { pDestBuf->numOfAlloc = newSize; STSVnodeBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize); if (tmp == NULL) { return -1; } pDestBuf->pData = tmp; } // directly copy the vnode index information memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfVnodes * sizeof(STSVnodeBlockInfoEx)); // set the new offset value for (int32_t i = 0; i < pSrcBuf->numOfVnodes; ++i) { STSVnodeBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize]; pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize; pBlockInfoEx->info.vnode = vnodeId; } pDestBuf->numOfVnodes = newSize; } else { STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf); pBlockInfoEx->len += pSrcBuf->pData[0].len; pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks; pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen; pBlockInfoEx->info.vnode = vnodeId; } int32_t r = fseek(pDestBuf->f, 0, SEEK_END); assert(r == 0); int64_t offset = getDataStartOffset(); int32_t size = pSrcBuf->fileSize - offset; #ifdef LINUX ssize_t rc = tsendfile(fileno(pDestBuf->f), fileno(pSrcBuf->f), &offset, size); #else ssize_t rc = fsendfile(pDestBuf->f, pSrcBuf->f, &offset, size); #endif if (rc == -1) { // tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); return -1; } if (rc != size) { // tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); return -1; } pDestBuf->numOfTotal += pSrcBuf->numOfTotal; int32_t oldSize = pDestBuf->fileSize; struct stat fileStat; fstat(fileno(pDestBuf->f), &fileStat); pDestBuf->fileSize = (uint32_t)fileStat.st_size; assert(pDestBuf->fileSize == oldSize + size); // tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf, // pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete); return 0; } STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order) { STSBuf* pTSBuf = tsBufCreate(true); STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info); pBlockInfo->numOfBlocks = numOfBlocks; pBlockInfo->compLen = len; pBlockInfo->offset = getDataStartOffset(); pBlockInfo->vnode = 0; // update prev vnode length info in file TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo); fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET); fwrite((void*)pData, 1, len, pTSBuf->f); pTSBuf->fileSize += len; pTSBuf->tsOrder = order; assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); STSBufFileHeader header = { .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; STSBufUpdateHeader(pTSBuf, &header); fsync(fileno(pTSBuf->f)); return pTSBuf; } STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) { STSElem elem = {.vnode = -1}; if (pTSBuf == NULL) { return elem; } int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId); if (j == -1) { return elem; } // for debug purpose // tsBufDisplay(pTSBuf); STSCursor* pCur = &pTSBuf->cur; STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[j].info; int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag); if (blockIndex < 0) { return elem; } pCur->vgroupIndex = j; pCur->blockIndex = blockIndex; tsBufGetBlock(pTSBuf, j, blockIndex); return tsBufGetElem(pTSBuf); } STSCursor tsBufGetCursor(STSBuf* pTSBuf) { STSCursor c = {.vgroupIndex = -1}; if (pTSBuf == NULL) { return c; } return pTSBuf->cur; } void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) { if (pTSBuf == NULL || pCur == NULL) { return; } // assert(pCur->vgroupIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0); if (pCur->vgroupIndex != -1) { tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex); } pTSBuf->cur = *pCur; } void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) { if (pTSBuf == NULL) { return; } pTSBuf->cur.order = order; } STSBuf* tsBufClone(STSBuf* pTSBuf) { if (pTSBuf == NULL) { return NULL; } return tsBufCreateFromFile(pTSBuf->path, false); } void tsBufDisplay(STSBuf* pTSBuf) { printf("-------start of ts comp file-------\n"); printf("number of vnode:%d\n", pTSBuf->numOfVnodes); int32_t old = pTSBuf->cur.order; pTSBuf->cur.order = TSDB_ORDER_ASC; tsBufResetPos(pTSBuf); while (tsBufNextPos(pTSBuf)) { STSElem elem = tsBufGetElem(pTSBuf); printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, *(int64_t*) elem.tag, elem.ts); } pTSBuf->cur.order = old; printf("-------end of ts comp file-------\n"); } static int32_t getDataStartOffset() { return sizeof(STSBufFileHeader) + TS_COMP_FILE_VNODE_MAX * sizeof(STSVnodeBlockInfo); } static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) { if (offset < 0 || offset >= getDataStartOffset()) { return -1; } if (fseek(pTSBuf->f, offset, SEEK_SET) != 0) { return -1; } fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f); return 0; } // update prev vnode length info in file static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo) { int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSVnodeBlockInfo); doUpdateVnodeInfo(pTSBuf, offset, pBlockInfo); } static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { const int32_t INITIAL_VNODEINFO_SIZE = 4; pTSBuf->numOfAlloc = INITIAL_VNODEINFO_SIZE; pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSVnodeBlockInfoEx)); if (pTSBuf->pData == NULL) { tsBufDestory(pTSBuf); return NULL; } pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE); if (pTSBuf->tsData.rawBuf == NULL) { tsBufDestory(pTSBuf); return NULL; } pTSBuf->bufSize = MEM_BUF_SIZE; pTSBuf->tsData.threshold = MEM_BUF_SIZE; pTSBuf->tsData.allocSize = MEM_BUF_SIZE; pTSBuf->assistBuf = malloc(MEM_BUF_SIZE); if (pTSBuf->assistBuf == NULL) { tsBufDestory(pTSBuf); return NULL; } pTSBuf->block.payload = malloc(MEM_BUF_SIZE); if (pTSBuf->block.payload == NULL) { tsBufDestory(pTSBuf); return NULL; } pTSBuf->fileSize += getDataStartOffset(); return pTSBuf; }