/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "tsdb.h" typedef struct { SMemTable *pMemTable; SArray *aBlkIdx; } SCommitH; static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb); static int32_t tsdbEndCommit(SCommitH *pCHandle); static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid); int32_t tsdbBegin2(STsdb *pTsdb) { int32_t code = 0; ASSERT(pTsdb->mem == NULL); code = tsdbMemTableCreate2(pTsdb, (SMemTable **)&pTsdb->mem); if (code) { tsdbError("vgId:%d failed to begin TSDB since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); goto _exit; } _exit: return code; } int32_t tsdbCommit2(STsdb *pTsdb) { int32_t code = 0; SCommitH ch = {0}; // start to commit code = tsdbStartCommit(&ch, pTsdb); if (code) { goto _exit; } // commit int32_t sfid; // todo int32_t efid; // todo for (int32_t fid = sfid; fid <= efid; fid++) { code = tsdbCommitToFile(&ch, fid); if (code) { goto _err; } } // end commit code = tsdbEndCommit(&ch); if (code) { goto _exit; } _exit: return code; _err: // TODO: rollback return code; } static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb) { int32_t code = 0; ASSERT(pTsdb->imem == NULL && pTsdb->mem); pTsdb->imem = pTsdb->mem; pTsdb->mem = NULL; // TODO return code; } static int32_t tsdbEndCommit(SCommitH *pCHandle) { int32_t code = 0; // TODO return code; } static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid) { int32_t code = 0; SMemDataIter iter = {0}; TSDBROW *pRow = NULL; int8_t hasData = 0; TSKEY fidSKey; TSKEY fidEKey; int32_t iMemData = 0; int32_t nMemData = taosArrayGetSize(pCHandle->pMemTable->aMemData); int32_t iBlockIdx = 0; int32_t nBlockIdx; // check if there are data in the time range for (; iMemData < nMemData; iMemData++) { SMemData *pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData); tsdbMemDataIterOpen(&iter, &(TSDBKEY){.ts = fidSKey, .version = 0}, 0); tsdbMemDataIterGet(&iter, &pRow); if (pRow->tsRow.ts >= fidSKey && pRow->tsRow.ts <= fidEKey) { hasData = 1; break; } } if (!hasData) return code; // loop to commit each table data nBlockIdx = 0; for (;;) { if (iBlockIdx >= nBlockIdx && iMemData >= nMemData) break; SMemData *pMemData = NULL; SBlockIdx *pBlockIdx = NULL; if (iMemData < nMemData) { pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iBlockIdx); } if (iBlockIdx < nBlockIdx) { // pBlockIdx } } return code; }