metaTable.c 25.8 KB
Newer Older
H
more  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
refact  
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "meta.h"
H
refact  
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19 20 21 22 23 24
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
H
Hongze Cheng 已提交
25
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
H
Hongze Cheng 已提交
26 27

int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
H
Hongze Cheng 已提交
28
  SMetaEntry  me = {0};
H
Hongze Cheng 已提交
29 30 31 32
  int         kLen = 0;
  int         vLen = 0;
  const void *pKey = NULL;
  const void *pVal = NULL;
H
Hongze Cheng 已提交
33
  void       *pBuf = NULL;
H
Hongze Cheng 已提交
34
  int32_t     szBuf = 0;
H
Hongze Cheng 已提交
35
  void       *p = NULL;
H
Hongze Cheng 已提交
36 37 38
  SMetaReader mr = {0};

  // validate req
H
Hongze Cheng 已提交
39
  metaReaderInit(&mr, pMeta, 0);
H
Hongze Cheng 已提交
40
  if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
H
Hongze Cheng 已提交
41 42
// TODO: just for pass case
#if 0
H
Hongze Cheng 已提交
43 44 45
    terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
    metaReaderClear(&mr);
    return -1;
H
Hongze Cheng 已提交
46 47 48 49
#else
    metaReaderClear(&mr);
    return 0;
#endif
H
Hongze Cheng 已提交
50
  }
H
Hongze Cheng 已提交
51
  metaReaderClear(&mr);
H
Hongze Cheng 已提交
52 53

  // set structs
H
Hongze Cheng 已提交
54
  me.version = version;
H
Hongze Cheng 已提交
55 56 57
  me.type = TSDB_SUPER_TABLE;
  me.uid = pReq->suid;
  me.name = pReq->name;
H
Hongze Cheng 已提交
58 59
  me.stbEntry.schema = pReq->schema;
  me.stbEntry.schemaTag = pReq->schemaTag;
H
Hongze Cheng 已提交
60

H
Hongze Cheng 已提交
61
  if (metaHandleEntry(pMeta, &me) < 0) goto _err;
H
Hongze Cheng 已提交
62

H
Hongze Cheng 已提交
63
  metaDebug("vgId:%d super table is created, name:%s uid: %" PRId64, TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
H
Hongze Cheng 已提交
64 65 66 67

  return 0;

_err:
H
Hongze Cheng 已提交
68
  metaError("vgId:%d failed to create super table: %s uid: %" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name,
H
Hongze Cheng 已提交
69 70 71 72 73
            pReq->suid, tstrerror(terrno));
  return -1;
}

int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
H
Hongze Cheng 已提交
74 75 76 77 78 79 80 81 82 83 84 85
  void *pKey = NULL;
  int   nKey = 0;
  void *pData = NULL;
  int   nData = 0;
  int   c = 0;
  int   rc = 0;

  // check if super table exists
  rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
  if (rc < 0 || *(tb_uid_t *)pData != pReq->suid) {
    terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
    return -1;
H
Hongze Cheng 已提交
86 87
  }

H
Hongze Cheng 已提交
88 89 90
  // drop all child tables
  TBC    *pCtbIdxc = NULL;
  SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t));
H
Hongze Cheng 已提交
91

H
Hongze Cheng 已提交
92
  tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
H
Hongze Cheng 已提交
93 94
  rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
  if (rc < 0) {
H
Hongze Cheng 已提交
95
    tdbTbcClose(pCtbIdxc);
H
Hongze Cheng 已提交
96 97
    metaWLock(pMeta);
    goto _drop_super_table;
H
Hongze Cheng 已提交
98 99 100
  }

  for (;;) {
H
Hongze Cheng 已提交
101 102
    rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, NULL, NULL);
    if (rc < 0) break;
H
Hongze Cheng 已提交
103

H
Hongze Cheng 已提交
104 105 106 107 108
    if (((SCtbIdxKey *)pKey)->suid < pReq->suid) {
      continue;
    } else if (((SCtbIdxKey *)pKey)->suid > pReq->suid) {
      break;
    }
H
Hongze Cheng 已提交
109

H
Hongze Cheng 已提交
110 111 112 113 114 115
    taosArrayPush(pArray, &(((SCtbIdxKey *)pKey)->uid));
  }

  tdbTbcClose(pCtbIdxc);

  metaWLock(pMeta);
H
Hongze Cheng 已提交
116

H
Hongze Cheng 已提交
117 118 119
  for (int32_t iChild = 0; iChild < taosArrayGetSize(pArray); iChild++) {
    tb_uid_t uid = *(tb_uid_t *)taosArrayGet(pArray, iChild);
    metaDropTableByUid(pMeta, uid);
H
Hongze Cheng 已提交
120 121
  }

H
Hongze Cheng 已提交
122 123 124 125 126 127 128 129 130 131 132 133
  taosArrayDestroy(pArray);

  // drop super table
_drop_super_table:
  tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData);
  tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = *(int64_t *)pData, .uid = pReq->suid}, sizeof(STbDbKey),
              &pMeta->txn);
  tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn);
  tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);

  metaULock(pMeta);

H
Hongze Cheng 已提交
134
_exit:
H
Hongze Cheng 已提交
135 136
  tdbFree(pKey);
  tdbFree(pData);
H
Hongze Cheng 已提交
137
  metaDebug("vgId:%d  super table %s uid:%" PRId64 " is dropped", TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
H
Hongze Cheng 已提交
138 139 140
  return 0;
}

H
Hongze Cheng 已提交
141 142 143
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
  SMetaEntry  oStbEntry = {0};
  SMetaEntry  nStbEntry = {0};
H
Hongze Cheng 已提交
144 145
  TBC        *pUidIdxc = NULL;
  TBC        *pTbDbc = NULL;
H
Hongze Cheng 已提交
146 147 148 149 150 151 152
  const void *pData;
  int         nData;
  int64_t     oversion;
  SDecoder    dc = {0};
  int32_t     ret;
  int32_t     c;

H
Hongze Cheng 已提交
153 154
  tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
  ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
H
Hongze Cheng 已提交
155 156 157 158 159
  if (ret < 0 || c) {
    ASSERT(0);
    return -1;
  }

H
Hongze Cheng 已提交
160
  ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
H
Hongze Cheng 已提交
161 162 163 164 165 166 167
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

  oversion = *(int64_t *)pData;

H
Hongze Cheng 已提交
168 169
  tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
  ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c);
H
Hongze Cheng 已提交
170 171
  ASSERT(ret == 0 && c == 0);

H
Hongze Cheng 已提交
172
  ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
H
Hongze Cheng 已提交
173 174
  ASSERT(ret == 0);

H
Hongze Cheng 已提交
175 176 177
  oStbEntry.pBuf = taosMemoryMalloc(nData);
  memcpy(oStbEntry.pBuf, pData, nData);
  tDecoderInit(&dc, oStbEntry.pBuf, nData);
H
Hongze Cheng 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
  metaDecodeEntry(&dc, &oStbEntry);

  nStbEntry.version = version;
  nStbEntry.type = TSDB_SUPER_TABLE;
  nStbEntry.uid = pReq->suid;
  nStbEntry.name = pReq->name;
  nStbEntry.stbEntry.schema = pReq->schema;
  nStbEntry.stbEntry.schemaTag = pReq->schemaTag;

  metaWLock(pMeta);
  // compare two entry
  if (oStbEntry.stbEntry.schema.sver != pReq->schema.sver) {
    if (oStbEntry.stbEntry.schema.nCols != pReq->schema.nCols) {
      metaSaveToSkmDb(pMeta, &nStbEntry);
    }
  }

  // if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) {
  //   // change tag schema
  // }

  // update table.db
  metaSaveToTbDb(pMeta, &nStbEntry);

  // update uid index
H
Hongze Cheng 已提交
203
  tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0);
H
Hongze Cheng 已提交
204

H
Hongze Cheng 已提交
205
  if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
H
Hongze Cheng 已提交
206 207
  metaULock(pMeta);
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
208 209
  tdbTbcClose(pTbDbc);
  tdbTbcClose(pUidIdxc);
H
Hongze Cheng 已提交
210 211 212
  return 0;
}

H
Hongze Cheng 已提交
213
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
H
Hongze Cheng 已提交
214 215
  SMetaEntry  me = {0};
  SMetaReader mr = {0};
H
Hongze Cheng 已提交
216 217 218 219 220

  // validate message
  if (pReq->type != TSDB_CHILD_TABLE && pReq->type != TSDB_NORMAL_TABLE) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto _err;
H
more  
Hongze Cheng 已提交
221 222
  }

H
Hongze Cheng 已提交
223
  // validate req
H
Hongze Cheng 已提交
224
  metaReaderInit(&mr, pMeta, 0);
H
Hongze Cheng 已提交
225
  if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
H
Hongze Cheng 已提交
226 227 228 229
    pReq->uid = mr.me.uid;
    if (pReq->type == TSDB_CHILD_TABLE) {
      pReq->ctb.suid = mr.me.ctbEntry.suid;
    }
H
Hongze Cheng 已提交
230 231 232
    terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
    metaReaderClear(&mr);
    return -1;
H
Hongze Cheng 已提交
233
  }
H
Hongze Cheng 已提交
234
  metaReaderClear(&mr);
H
Hongze Cheng 已提交
235 236

  // build SMetaEntry
H
Hongze Cheng 已提交
237
  me.version = version;
H
Hongze Cheng 已提交
238 239 240 241 242 243 244 245 246 247 248
  me.type = pReq->type;
  me.uid = pReq->uid;
  me.name = pReq->name;
  if (me.type == TSDB_CHILD_TABLE) {
    me.ctbEntry.ctime = pReq->ctime;
    me.ctbEntry.ttlDays = pReq->ttl;
    me.ctbEntry.suid = pReq->ctb.suid;
    me.ctbEntry.pTags = pReq->ctb.pTag;
  } else {
    me.ntbEntry.ctime = pReq->ctime;
    me.ntbEntry.ttlDays = pReq->ttl;
H
Hongze Cheng 已提交
249
    me.ntbEntry.schema = pReq->ntb.schema;
H
Hongze Cheng 已提交
250
    me.ntbEntry.ncid = me.ntbEntry.schema.pSchema[me.ntbEntry.schema.nCols - 1].colId + 1;
H
more  
Hongze Cheng 已提交
251 252
  }

H
Hongze Cheng 已提交
253
  if (metaHandleEntry(pMeta, &me) < 0) goto _err;
H
Hongze Cheng 已提交
254

H
Hongze Cheng 已提交
255 256
  metaDebug("vgId:%d table %s uid %" PRId64 " is created, type:%" PRId8, TD_VID(pMeta->pVnode), pReq->name, pReq->uid,
            pReq->type);
H
refact  
Hongze Cheng 已提交
257
  return 0;
H
Hongze Cheng 已提交
258 259 260 261 262

_err:
  metaError("vgId:%d failed to create table:%s type:%s since %s", TD_VID(pMeta->pVnode), pReq->name,
            pReq->type == TSDB_CHILD_TABLE ? "child table" : "normal table", tstrerror(terrno));
  return -1;
H
refact  
Hongze Cheng 已提交
263 264
}

265
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
H
Hongze Cheng 已提交
266 267 268
  TBC        *pTbDbc = NULL;
  TBC        *pUidIdxc = NULL;
  TBC        *pNameIdxc = NULL;
H
Hongze Cheng 已提交
269 270 271 272 273
  const void *pData;
  int         nData;
  tb_uid_t    uid;
  int64_t     tver;
  SMetaEntry  me = {0};
H
Hongze Cheng 已提交
274
  SDecoder    coder = {0};
H
Hongze Cheng 已提交
275 276 277
  int8_t      type;
  int64_t     ctime;
  tb_uid_t    suid;
H
Hongze Cheng 已提交
278
  int         c = 0, ret;
H
Hongze Cheng 已提交
279 280

  // search & delete the name idx
H
Hongze Cheng 已提交
281 282 283 284
  tdbTbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn);
  ret = tdbTbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c);
  if (ret < 0 || !tdbTbcIsValid(pNameIdxc) || c) {
    tdbTbcClose(pNameIdxc);
H
Hongze Cheng 已提交
285
    terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
H
more  
Hongze Cheng 已提交
286 287 288
    return -1;
  }

H
Hongze Cheng 已提交
289
  ret = tdbTbcGet(pNameIdxc, NULL, NULL, &pData, &nData);
H
Hongze Cheng 已提交
290 291
  if (ret < 0) {
    ASSERT(0);
H
more  
Hongze Cheng 已提交
292 293
    return -1;
  }
H
Hongze Cheng 已提交
294 295 296

  uid = *(tb_uid_t *)pData;

H
Hongze Cheng 已提交
297 298
  tdbTbcDelete(pNameIdxc);
  tdbTbcClose(pNameIdxc);
H
Hongze Cheng 已提交
299 300

  // search & delete uid idx
H
Hongze Cheng 已提交
301 302
  tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
  ret = tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
H
Hongze Cheng 已提交
303 304 305 306 307
  if (ret < 0 || c != 0) {
    ASSERT(0);
    return -1;
  }

H
Hongze Cheng 已提交
308
  ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
H
Hongze Cheng 已提交
309 310 311 312 313 314
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

  tver = *(int64_t *)pData;
H
Hongze Cheng 已提交
315 316
  tdbTbcDelete(pUidIdxc);
  tdbTbcClose(pUidIdxc);
H
Hongze Cheng 已提交
317 318

  // search and get meta entry
H
Hongze Cheng 已提交
319 320
  tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
  ret = tdbTbcMoveTo(pTbDbc, &(STbDbKey){.uid = uid, .version = tver}, sizeof(STbDbKey), &c);
H
Hongze Cheng 已提交
321 322 323 324 325
  if (ret < 0 || c != 0) {
    ASSERT(0);
    return -1;
  }

H
Hongze Cheng 已提交
326
  ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
H
Hongze Cheng 已提交
327 328 329 330 331
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

H
Hongze Cheng 已提交
332 333 334
  // decode entry
  void *pDataCopy = taosMemoryMalloc(nData);  // remove the copy (todo)
  memcpy(pDataCopy, pData, nData);
H
Hongze Cheng 已提交
335
  tDecoderInit(&coder, pDataCopy, nData);
H
Hongze Cheng 已提交
336 337 338 339 340 341 342 343 344 345
  ret = metaDecodeEntry(&coder, &me);
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

  type = me.type;
  if (type == TSDB_CHILD_TABLE) {
    ctime = me.ctbEntry.ctime;
    suid = me.ctbEntry.suid;
346
    taosArrayPush(tbUids, &me.uid);
H
Hongze Cheng 已提交
347 348 349 350 351 352
  } else if (type == TSDB_NORMAL_TABLE) {
    ctime = me.ntbEntry.ctime;
    suid = 0;
  } else {
    ASSERT(0);
  }
H
Hongze Cheng 已提交
353

H
Hongze Cheng 已提交
354
  taosMemoryFree(pDataCopy);
H
Hongze Cheng 已提交
355
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
356
  tdbTbcClose(pTbDbc);
H
more  
Hongze Cheng 已提交
357

H
Hongze Cheng 已提交
358 359
  if (type == TSDB_CHILD_TABLE) {
    // remove the pCtbIdx
H
Hongze Cheng 已提交
360 361
    TBC *pCtbIdxc = NULL;
    tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
H
Hongze Cheng 已提交
362

H
Hongze Cheng 已提交
363
    ret = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = uid}, sizeof(SCtbIdxKey), &c);
H
Hongze Cheng 已提交
364 365 366 367 368
    if (ret < 0 || c != 0) {
      ASSERT(0);
      return -1;
    }

H
Hongze Cheng 已提交
369 370
    tdbTbcDelete(pCtbIdxc);
    tdbTbcClose(pCtbIdxc);
H
Hongze Cheng 已提交
371 372 373 374 375 376 377 378 379 380 381 382

    // remove tags from pTagIdx (todo)
  } else if (type == TSDB_NORMAL_TABLE) {
    // remove from pSkmDb
  } else {
    ASSERT(0);
  }

  // remove from ttl (todo)
  if (ctime > 0) {
  }

H
refact  
Hongze Cheng 已提交
383 384
  return 0;
}
H
Hongze Cheng 已提交
385

H
Hongze Cheng 已提交
386
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
H
Hongze Cheng 已提交
387
  void           *pVal = NULL;
H
Hongze Cheng 已提交
388
  int             nVal = 0;
H
Hongze Cheng 已提交
389
  const void     *pData = NULL;
H
Hongze Cheng 已提交
390 391 392 393
  int             nData = 0;
  int             ret = 0;
  tb_uid_t        uid;
  int64_t         oversion;
H
Hongze Cheng 已提交
394
  SSchema        *pColumn = NULL;
H
Hongze Cheng 已提交
395 396 397 398 399
  SMetaEntry      entry = {0};
  SSchemaWrapper *pSchema;
  int             c;

  // search name index
H
Hongze Cheng 已提交
400
  ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
H
Hongze Cheng 已提交
401 402 403 404 405 406 407 408 409 410
  if (ret < 0) {
    terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
    return -1;
  }

  uid = *(tb_uid_t *)pVal;
  tdbFree(pVal);
  pVal = NULL;

  // search uid index
H
Hongze Cheng 已提交
411
  TBC *pUidIdxc = NULL;
H
Hongze Cheng 已提交
412

H
Hongze Cheng 已提交
413 414
  tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
  tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
H
Hongze Cheng 已提交
415 416
  ASSERT(c == 0);

H
Hongze Cheng 已提交
417
  tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
H
Hongze Cheng 已提交
418 419 420
  oversion = *(int64_t *)pData;

  // search table.db
H
Hongze Cheng 已提交
421
  TBC *pTbDbc = NULL;
H
Hongze Cheng 已提交
422

H
Hongze Cheng 已提交
423 424
  tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
  tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
H
Hongze Cheng 已提交
425
  ASSERT(c == 0);
H
Hongze Cheng 已提交
426
  tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
H
Hongze Cheng 已提交
427 428 429

  // get table entry
  SDecoder dc = {0};
H
Hongze Cheng 已提交
430 431 432
  entry.pBuf = taosMemoryMalloc(nData);
  memcpy(entry.pBuf, pData, nData);
  tDecoderInit(&dc, entry.pBuf, nData);
H
Hongze Cheng 已提交
433 434
  ret = metaDecodeEntry(&dc, &entry);
  ASSERT(ret == 0);
H
Hongze Cheng 已提交
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454

  if (entry.type != TSDB_NORMAL_TABLE) {
    terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
    goto _err;
  }

  // search the column to add/drop/update
  pSchema = &entry.ntbEntry.schema;
  int32_t iCol = 0;
  for (;;) {
    pColumn = NULL;

    if (iCol >= pSchema->nCols) break;
    pColumn = &pSchema->pSchema[iCol];

    if (strcmp(pColumn->name, pAlterTbReq->colName) == 0) break;
    iCol++;
  }

  entry.version = version;
H
Hongze Cheng 已提交
455 456
  int      tlen;
  SSchema *pNewSchema = NULL;
H
Hongze Cheng 已提交
457 458 459 460 461 462 463 464
  switch (pAlterTbReq->action) {
    case TSDB_ALTER_TABLE_ADD_COLUMN:
      if (pColumn) {
        terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
        goto _err;
      }
      pSchema->sver++;
      pSchema->nCols++;
H
Hongze Cheng 已提交
465 466 467
      pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols);
      memcpy(pNewSchema, pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols - 1));
      pSchema->pSchema = pNewSchema;
H
Hongze Cheng 已提交
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
      pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].bytes = pAlterTbReq->bytes;
      pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].type = pAlterTbReq->type;
      pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].flags = pAlterTbReq->flags;
      pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].colId = entry.ntbEntry.ncid++;
      strcpy(pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].name, pAlterTbReq->colName);
      break;
    case TSDB_ALTER_TABLE_DROP_COLUMN:
      if (pColumn == NULL) {
        terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
        goto _err;
      }
      if (pColumn->colId == 0) {
        terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
        goto _err;
      }
      pSchema->sver++;
      tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
      if (tlen) {
        memmove(pColumn, pColumn + 1, tlen);
      }
H
Hongze Cheng 已提交
488
      pSchema->nCols--;
H
Hongze Cheng 已提交
489 490 491 492 493 494
      break;
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
      if (pColumn == NULL) {
        terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
        goto _err;
      }
H
Hongze Cheng 已提交
495
      if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes > pAlterTbReq->colModBytes) {
H
Hongze Cheng 已提交
496 497 498 499
        terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
        goto _err;
      }
      pSchema->sver++;
H
Hongze Cheng 已提交
500
      pColumn->bytes = pAlterTbReq->colModBytes;
H
Hongze Cheng 已提交
501 502 503 504 505 506 507 508 509 510 511 512 513
      break;
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
      if (pColumn == NULL) {
        terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
        goto _err;
      }
      pSchema->sver++;
      strcpy(pColumn->name, pAlterTbReq->colNewName);
      break;
  }

  entry.version = version;

H
Hongze Cheng 已提交
514 515 516 517 518 519
  // do actual write
  metaWLock(pMeta);

  // save to table db
  metaSaveToTbDb(pMeta, &entry);

H
Hongze Cheng 已提交
520
  tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
H
Hongze Cheng 已提交
521 522 523 524 525

  metaSaveToSkmDb(pMeta, &entry);

  metaULock(pMeta);

H
Hongze Cheng 已提交
526
  if (pNewSchema) taosMemoryFree(pNewSchema);
H
Hongze Cheng 已提交
527
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
528 529
  tdbTbcClose(pTbDbc);
  tdbTbcClose(pUidIdxc);
H
Hongze Cheng 已提交
530
  return 0;
H
Hongze Cheng 已提交
531 532 533

_err:
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
534 535
  tdbTbcClose(pTbDbc);
  tdbTbcClose(pUidIdxc);
H
Hongze Cheng 已提交
536
  return -1;
H
Hongze Cheng 已提交
537 538 539
}

static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
H
Hongze Cheng 已提交
540 541
  SMetaEntry  ctbEntry = {0};
  SMetaEntry  stbEntry = {0};
H
Hongze Cheng 已提交
542
  void       *pVal = NULL;
H
Hongze Cheng 已提交
543 544 545 546 547 548 549 550 551
  int         nVal = 0;
  int         ret;
  int         c;
  tb_uid_t    uid;
  int64_t     oversion;
  const void *pData = NULL;
  int         nData = 0;

  // search name index
H
Hongze Cheng 已提交
552
  ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
H
Hongze Cheng 已提交
553 554 555 556 557 558 559 560 561 562
  if (ret < 0) {
    terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
    return -1;
  }

  uid = *(tb_uid_t *)pVal;
  tdbFree(pVal);
  pVal = NULL;

  // search uid index
H
Hongze Cheng 已提交
563
  TBC *pUidIdxc = NULL;
H
Hongze Cheng 已提交
564

H
Hongze Cheng 已提交
565 566
  tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
  tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
H
Hongze Cheng 已提交
567 568
  ASSERT(c == 0);

H
Hongze Cheng 已提交
569
  tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
H
Hongze Cheng 已提交
570 571 572
  oversion = *(int64_t *)pData;

  // search table.db
H
Hongze Cheng 已提交
573
  TBC     *pTbDbc = NULL;
H
Hongze Cheng 已提交
574 575
  SDecoder dc1 = {0};
  SDecoder dc2 = {0};
H
Hongze Cheng 已提交
576

H
Hongze Cheng 已提交
577
  /* get ctbEntry */
H
Hongze Cheng 已提交
578 579
  tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
  tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
H
Hongze Cheng 已提交
580
  ASSERT(c == 0);
H
Hongze Cheng 已提交
581
  tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
H
Hongze Cheng 已提交
582

H
Hongze Cheng 已提交
583 584
  ctbEntry.pBuf = taosMemoryMalloc(nData);
  memcpy(ctbEntry.pBuf, pData, nData);
H
Hongze Cheng 已提交
585 586
  tDecoderInit(&dc1, ctbEntry.pBuf, nData);
  metaDecodeEntry(&dc1, &ctbEntry);
H
Hongze Cheng 已提交
587

H
Hongze Cheng 已提交
588
  /* get stbEntry*/
H
Hongze Cheng 已提交
589 590 591
  tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
  tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
           (void **)&stbEntry.pBuf, &nVal);
H
Hongze Cheng 已提交
592
  tdbFree(pVal);
H
Hongze Cheng 已提交
593 594
  tDecoderInit(&dc2, stbEntry.pBuf, nVal);
  metaDecodeEntry(&dc2, &stbEntry);
H
Hongze Cheng 已提交
595 596

  SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
H
Hongze Cheng 已提交
597
  SSchema        *pColumn = NULL;
H
Hongze Cheng 已提交
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
  int32_t         iCol = 0;
  for (;;) {
    pColumn = NULL;

    if (iCol >= pTagSchema->nCols) break;
    pColumn = &pTagSchema->pSchema[iCol];

    if (strcmp(pColumn->name, pAlterTbReq->tagName) == 0) break;
    iCol++;
  }

  if (pColumn == NULL) {
    terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
    goto _err;
  }
H
Hongze Cheng 已提交
613

H
Hongze Cheng 已提交
614 615
  if (iCol == 0) {
    // TODO : need to update tag index
H
Hongze Cheng 已提交
616
  }
H
Hongze Cheng 已提交
617
  ctbEntry.version = version;
H
Hongze Cheng 已提交
618
  if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
619
    ctbEntry.ctbEntry.pTags = taosMemoryMalloc(pAlterTbReq->nTagVal);
H
Hongze Cheng 已提交
620
    if (ctbEntry.ctbEntry.pTags == NULL) {
wmmhello's avatar
wmmhello 已提交
621 622 623
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
624 625
    memcpy((void *)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
  } else {
wmmhello's avatar
wmmhello 已提交
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
    SKVRowBuilder kvrb = {0};
    const SKVRow  pOldTag = (const SKVRow)ctbEntry.ctbEntry.pTags;
    SKVRow        pNewTag = NULL;

    tdInitKVRowBuilder(&kvrb);
    for (int32_t i = 0; i < pTagSchema->nCols; i++) {
      SSchema *pCol = &pTagSchema->pSchema[i];
      if (iCol == i) {
        tdAddColToKVRow(&kvrb, pCol->colId, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
      } else {
        void *p = tdGetKVRowValOfCol(pOldTag, pCol->colId);
        if (p) {
          if (IS_VAR_DATA_TYPE(pCol->type)) {
            tdAddColToKVRow(&kvrb, pCol->colId, p, varDataTLen(p));
          } else {
            tdAddColToKVRow(&kvrb, pCol->colId, p, pCol->bytes);
          }
H
Hongze Cheng 已提交
643 644 645 646
        }
      }
    }

wmmhello's avatar
wmmhello 已提交
647 648 649
    ctbEntry.ctbEntry.pTags = tdGetKVRowFromBuilder(&kvrb);
    tdDestroyKVRowBuilder(&kvrb);
  }
H
Hongze Cheng 已提交
650 651 652 653 654

  // save to table.db
  metaSaveToTbDb(pMeta, &ctbEntry);

  // save to uid.idx
H
Hongze Cheng 已提交
655
  tdbTbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn);
H
Hongze Cheng 已提交
656

H
Hongze Cheng 已提交
657 658
  tDecoderClear(&dc1);
  tDecoderClear(&dc2);
H
Hongze Cheng 已提交
659
  if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
H
Hongze Cheng 已提交
660 661
  if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
  if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
H
Hongze Cheng 已提交
662 663
  tdbTbcClose(pTbDbc);
  tdbTbcClose(pUidIdxc);
H
Hongze Cheng 已提交
664
  return 0;
H
Hongze Cheng 已提交
665 666

_err:
H
Hongze Cheng 已提交
667 668
  tDecoderClear(&dc1);
  tDecoderClear(&dc2);
H
Hongze Cheng 已提交
669 670
  if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
  if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
H
Hongze Cheng 已提交
671 672
  tdbTbcClose(pTbDbc);
  tdbTbcClose(pUidIdxc);
H
Hongze Cheng 已提交
673
  return -1;
H
Hongze Cheng 已提交
674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699
}

static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
  // TODO
  ASSERT(0);
  return 0;
}

int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
  switch (pReq->action) {
    case TSDB_ALTER_TABLE_ADD_COLUMN:
    case TSDB_ALTER_TABLE_DROP_COLUMN:
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
      return metaAlterTableColumn(pMeta, version, pReq);
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:
      return metaUpdateTableTagVal(pMeta, version, pReq);
    case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
      return metaUpdateTableOptions(pMeta, version, pReq);
    default:
      terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
      return -1;
      break;
  }
}

H
Hongze Cheng 已提交
700
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
H
Hongze Cheng 已提交
701
  STbDbKey tbDbKey;
H
Hongze Cheng 已提交
702 703
  void    *pKey = NULL;
  void    *pVal = NULL;
H
Hongze Cheng 已提交
704 705
  int      kLen = 0;
  int      vLen = 0;
H
Hongze Cheng 已提交
706
  SEncoder coder = {0};
H
Hongze Cheng 已提交
707 708

  // set key and value
H
Hongze Cheng 已提交
709
  tbDbKey.version = pME->version;
H
Hongze Cheng 已提交
710 711 712 713
  tbDbKey.uid = pME->uid;

  pKey = &tbDbKey;
  kLen = sizeof(tbDbKey);
H
Hongze Cheng 已提交
714

wafwerar's avatar
wafwerar 已提交
715 716 717
  int32_t ret = 0;
  tEncodeSize(metaEncodeEntry, pME, vLen, ret);
  if (ret < 0) {
H
Hongze Cheng 已提交
718 719 720 721 722 723 724 725 726
    goto _err;
  }

  pVal = taosMemoryMalloc(vLen);
  if (pVal == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

H
Hongze Cheng 已提交
727
  tEncoderInit(&coder, pVal, vLen);
H
Hongze Cheng 已提交
728 729 730 731 732

  if (metaEncodeEntry(&coder, pME) < 0) {
    goto _err;
  }

H
Hongze Cheng 已提交
733
  tEncoderClear(&coder);
H
Hongze Cheng 已提交
734 735

  // write to table.db
H
Hongze Cheng 已提交
736
  if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
H
Hongze Cheng 已提交
737 738 739 740 741 742 743 744 745 746 747
    goto _err;
  }

  taosMemoryFree(pVal);
  return 0;

_err:
  taosMemoryFree(pVal);
  return -1;
}

H
Hongze Cheng 已提交
748
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
H
Hongze Cheng 已提交
749
  return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
H
Hongze Cheng 已提交
750 751
}

H
Hongze Cheng 已提交
752
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
H
Hongze Cheng 已提交
753
  return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
H
Hongze Cheng 已提交
754 755
}

H
Hongze Cheng 已提交
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
  int32_t    ttlDays;
  int64_t    ctime;
  STtlIdxKey ttlKey;

  if (pME->type == TSDB_CHILD_TABLE) {
    ctime = pME->ctbEntry.ctime;
    ttlDays = pME->ctbEntry.ttlDays;
  } else if (pME->type == TSDB_NORMAL_TABLE) {
    ctime = pME->ntbEntry.ctime;
    ttlDays = pME->ntbEntry.ttlDays;
  } else {
    ASSERT(0);
  }

  if (ttlDays <= 0) return 0;

  ttlKey.dtime = ctime + ttlDays * 24 * 60 * 60;
  ttlKey.uid = pME->uid;

H
Hongze Cheng 已提交
776
  return tdbTbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn);
H
Hongze Cheng 已提交
777 778
}

H
Hongze Cheng 已提交
779 780
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
  SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};
H
Hongze Cheng 已提交
781
  return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
H
Hongze Cheng 已提交
782 783
}

H
Hongze Cheng 已提交
784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817
static int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid,
                               STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
  int32_t nTagData = 0;

  if (pTagData) {
    if (IS_VAR_DATA_TYPE(type)) {
      nTagData = varDataTLen(pTagData);
    } else {
      nTagData = tDataTypes[type].bytes;
    }
  }
  *nTagIdxKey = sizeof(STagIdxKey) + nTagData + sizeof(tb_uid_t);

  *ppTagIdxKey = (STagIdxKey *)taosMemoryMalloc(*nTagIdxKey);
  if (*ppTagIdxKey == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  (*ppTagIdxKey)->suid = suid;
  (*ppTagIdxKey)->cid = cid;
  (*ppTagIdxKey)->isNull = (pTagData == NULL) ? 1 : 0;
  (*ppTagIdxKey)->type = type;
  if (nTagData) memcpy((*ppTagIdxKey)->data, pTagData, nTagData);
  *(tb_uid_t *)((*ppTagIdxKey)->data + nTagData) = uid;

  return 0;
}

static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
  if (pTagIdxKey) taosMemoryFree(pTagIdxKey);
}

static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
H
Hongze Cheng 已提交
818
  void          *pData = NULL;
H
Hongze Cheng 已提交
819 820 821
  int            nData = 0;
  STbDbKey       tbDbKey = {0};
  SMetaEntry     stbEntry = {0};
H
Hongze Cheng 已提交
822
  STagIdxKey    *pTagIdxKey = NULL;
H
Hongze Cheng 已提交
823 824
  int32_t        nTagIdxKey;
  const SSchema *pTagColumn;       // = &stbEntry.stbEntry.schema.pSchema[0];
H
Hongze Cheng 已提交
825
  const void    *pTagData = NULL;  //
H
Hongze Cheng 已提交
826 827 828
  SDecoder       dc = {0};

  // get super table
H
Hongze Cheng 已提交
829
  tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData);
H
Hongze Cheng 已提交
830 831
  tbDbKey.uid = pCtbEntry->ctbEntry.suid;
  tbDbKey.version = *(int64_t *)pData;
H
Hongze Cheng 已提交
832
  tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
H
Hongze Cheng 已提交
833 834 835 836 837 838 839 840

  tDecoderInit(&dc, pData, nData);
  metaDecodeEntry(&dc, &stbEntry);

  pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
  pTagData = tdGetKVRowValOfCol((const SKVRow)pCtbEntry->ctbEntry.pTags, pTagColumn->colId);

  // update tag index
dengyihao's avatar
dengyihao 已提交
841 842 843 844 845 846 847 848 849 850 851 852
#ifdef USE_INVERTED_INDEX
  tb_uid_t suid = pCtbEntry->ctbEntry.suid;
  tb_uid_t tuid = pCtbEntry->uid;

  SIndexMultiTerm *tmGroup = indexMultiTermCreate();

  SIndexTerm *tm = indexTermCreate(suid, ADD_VALUE, pTagColumn->type, pTagColumn->name, sizeof(pTagColumn->name),
                                   pTagData, pTagData == NULL ? 0 : strlen(pTagData));
  indexMultiTermAdd(tmGroup, tm);
  int ret = indexPut((SIndex *)pMeta->pTagIvtIdx, tmGroup, tuid);
  indexMultiTermDestroy(tmGroup);
#else
H
Hongze Cheng 已提交
853 854 855 856
  if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, pTagColumn->type, pCtbEntry->uid,
                          &pTagIdxKey, &nTagIdxKey) < 0) {
    return -1;
  }
H
Hongze Cheng 已提交
857
  tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
H
Hongze Cheng 已提交
858
  metaDestroyTagIdxKey(pTagIdxKey);
dengyihao's avatar
dengyihao 已提交
859
#endif
H
Hongze Cheng 已提交
860 861
  tDecoderClear(&dc);
  tdbFree(pData);
H
Hongze Cheng 已提交
862 863 864
  return 0;
}

H
Hongze Cheng 已提交
865
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
H
Hongze Cheng 已提交
866
  SEncoder              coder = {0};
H
Hongze Cheng 已提交
867
  void                 *pVal = NULL;
H
Hongze Cheng 已提交
868 869 870 871 872 873 874 875 876 877 878
  int                   vLen = 0;
  int                   rcode = 0;
  SSkmDbKey             skmDbKey = {0};
  const SSchemaWrapper *pSW;

  if (pME->type == TSDB_SUPER_TABLE) {
    pSW = &pME->stbEntry.schema;
  } else if (pME->type == TSDB_NORMAL_TABLE) {
    pSW = &pME->ntbEntry.schema;
  } else {
    ASSERT(0);
H
Hongze Cheng 已提交
879 880
  }

H
Hongze Cheng 已提交
881 882
  skmDbKey.uid = pME->uid;
  skmDbKey.sver = pSW->sver;
H
Hongze Cheng 已提交
883 884

  // encode schema
wafwerar's avatar
wafwerar 已提交
885 886 887
  int32_t ret = 0;
  tEncodeSize(tEncodeSSchemaWrapper, pSW, vLen, ret);
  if (ret < 0) return -1;
H
Hongze Cheng 已提交
888
  pVal = taosMemoryMalloc(vLen);
H
Hongze Cheng 已提交
889 890 891 892 893 894
  if (pVal == NULL) {
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
895
  tEncoderInit(&coder, pVal, vLen);
H
Hongze Cheng 已提交
896 897
  tEncodeSSchemaWrapper(&coder, pSW);

H
Hongze Cheng 已提交
898
  if (tdbTbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
H
Hongze Cheng 已提交
899 900 901 902 903
    rcode = -1;
    goto _exit;
  }

_exit:
H
Hongze Cheng 已提交
904
  taosMemoryFree(pVal);
H
Hongze Cheng 已提交
905
  tEncoderClear(&coder);
H
Hongze Cheng 已提交
906 907 908 909
  return rcode;
}

static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
H
Hongze Cheng 已提交
910 911
  metaWLock(pMeta);

H
Hongze Cheng 已提交
912
  // save to table.db
H
Hongze Cheng 已提交
913
  if (metaSaveToTbDb(pMeta, pME) < 0) goto _err;
H
Hongze Cheng 已提交
914 915

  // update uid.idx
H
Hongze Cheng 已提交
916
  if (metaUpdateUidIdx(pMeta, pME) < 0) goto _err;
H
Hongze Cheng 已提交
917 918

  // update name.idx
H
Hongze Cheng 已提交
919
  if (metaUpdateNameIdx(pMeta, pME) < 0) goto _err;
H
Hongze Cheng 已提交
920 921 922

  if (pME->type == TSDB_CHILD_TABLE) {
    // update ctb.idx
H
Hongze Cheng 已提交
923
    if (metaUpdateCtbIdx(pMeta, pME) < 0) goto _err;
H
Hongze Cheng 已提交
924 925

    // update tag.idx
H
Hongze Cheng 已提交
926
    if (metaUpdateTagIdx(pMeta, pME) < 0) goto _err;
H
Hongze Cheng 已提交
927 928
  } else {
    // update schema.db
H
Hongze Cheng 已提交
929
    if (metaSaveToSkmDb(pMeta, pME) < 0) goto _err;
H
Hongze Cheng 已提交
930 931 932
  }

  if (pME->type != TSDB_SUPER_TABLE) {
H
Hongze Cheng 已提交
933
    if (metaUpdateTtlIdx(pMeta, pME) < 0) goto _err;
H
Hongze Cheng 已提交
934 935
  }

H
Hongze Cheng 已提交
936
  metaULock(pMeta);
H
Hongze Cheng 已提交
937
  return 0;
H
Hongze Cheng 已提交
938 939 940 941

_err:
  metaULock(pMeta);
  return -1;
942
}