tsdbCommit2.c 4.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

typedef struct {
  SMemTable *pMemTable;
  SArray    *aBlkIdx;
} SCommitH;

static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb);
static int32_t tsdbEndCommit(SCommitH *pCHandle);
static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid);

int32_t tsdbBegin2(STsdb *pTsdb) {
  int32_t code = 0;

  ASSERT(pTsdb->mem == NULL);
  code = tsdbMemTableCreate2(pTsdb, (SMemTable **)&pTsdb->mem);
  if (code) {
    tsdbError("vgId:%d failed to begin TSDB since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
    goto _exit;
  }

_exit:
  return code;
}

int32_t tsdbCommit2(STsdb *pTsdb) {
  int32_t  code = 0;
  SCommitH ch = {0};

  // start to commit
  code = tsdbStartCommit(&ch, pTsdb);
  if (code) {
    goto _exit;
  }

  // commit
  int32_t sfid;  // todo
  int32_t efid;  // todo
  for (int32_t fid = sfid; fid <= efid; fid++) {
    code = tsdbCommitToFile(&ch, fid);
    if (code) {
      goto _err;
    }
  }

  // end commit
  code = tsdbEndCommit(&ch);
  if (code) {
    goto _exit;
  }

_exit:
  return code;

_err:
  // TODO: rollback
  return code;
}

static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb) {
  int32_t code = 0;

  ASSERT(pTsdb->imem == NULL && pTsdb->mem);
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
H
Hongze Cheng 已提交
81
  // TODO
H
Hongze Cheng 已提交
82 83 84 85 86 87 88 89 90
  return code;
}

static int32_t tsdbEndCommit(SCommitH *pCHandle) {
  int32_t code = 0;
  // TODO
  return code;
}

H
Hongze Cheng 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
static int32_t tsdbCommitTableData(SCommitH *pCHandle, SMemData *pMemData, SBlockIdx *pBlockIdx) {
  int32_t code = 0;
  // TODO
  return code;
}

static int32_t tsdbTableIdCmprFn(const void *p1, const void *p2) {
  TABLEID *pId1 = (TABLEID *)p1;
  TABLEID *pId2 = (TABLEID *)p2;

  if (pId1->suid < pId2->suid) {
    return -1;
  } else if (pId1->suid > pId2->suid) {
    return 1;
  }

  if (pId1->uid < pId2->uid) {
    return -1;
  } else if (pId1->uid > pId2->uid) {
    return 1;
  }

  return 0;
}
H
Hongze Cheng 已提交
115
static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid) {
H
Hongze Cheng 已提交
116 117 118 119 120 121 122 123 124 125
  int32_t      code = 0;
  SMemDataIter iter = {0};
  TSDBROW     *pRow = NULL;
  int8_t       hasData = 0;
  TSKEY        fidSKey;
  TSKEY        fidEKey;
  int32_t      iMemData = 0;
  int32_t      nMemData = taosArrayGetSize(pCHandle->pMemTable->aMemData);
  int32_t      iBlockIdx = 0;
  int32_t      nBlockIdx;
H
Hongze Cheng 已提交
126 127

  // check if there are data in the time range
H
Hongze Cheng 已提交
128 129 130 131 132 133 134 135 136
  for (; iMemData < nMemData; iMemData++) {
    SMemData *pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData);
    tsdbMemDataIterOpen(&iter, &(TSDBKEY){.ts = fidSKey, .version = 0}, 0);
    tsdbMemDataIterGet(&iter, &pRow);

    if (pRow->tsRow.ts >= fidSKey && pRow->tsRow.ts <= fidEKey) {
      hasData = 1;
      break;
    }
H
Hongze Cheng 已提交
137 138
  }

H
Hongze Cheng 已提交
139 140
  if (!hasData) return code;

H
Hongze Cheng 已提交
141 142
  // create or open the file to commit(todo)

H
Hongze Cheng 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155
  // loop to commit each table data
  nBlockIdx = 0;
  for (;;) {
    if (iBlockIdx >= nBlockIdx && iMemData >= nMemData) break;

    SMemData  *pMemData = NULL;
    SBlockIdx *pBlockIdx = NULL;
    if (iMemData < nMemData) {
      pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iBlockIdx);
    }
    if (iBlockIdx < nBlockIdx) {
      // pBlockIdx
    }
H
Hongze Cheng 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181

    if (pMemData && pBlockIdx) {
      int32_t c = tsdbTableIdCmprFn(&(TABLEID){.suid = pMemData->suid, .uid = pMemData->uid},
                                    &(TABLEID){.suid = pBlockIdx->suid, .uid = pBlockIdx->uid});
      if (c == 0) {
        iMemData++;
        iBlockIdx++;
      } else if (c < 0) {
        pBlockIdx = NULL;
        iMemData++;
      } else {
        pMemData = NULL;
        iBlockIdx++;
      }
    } else {
      if (pMemData) {
        iMemData++;
      } else {
        iBlockIdx++;
      }
    }

    code = tsdbCommitTableData(pCHandle, pMemData, pBlockIdx);
    if (code) {
      goto _err;
    }
H
Hongze Cheng 已提交
182
  }
H
Hongze Cheng 已提交
183 184

  return code;
H
Hongze Cheng 已提交
185 186 187

_err:
  return code;
H
Hongze Cheng 已提交
188
}