diff --git a/source/dnode/vnode/tsdb/inc/tsdbCompact.h b/source/dnode/vnode/tsdb/inc/tsdbCompact.h
index 441dfda6ade603c2dd1240962543307ca3bb83f9..c5df4e27acbb9ffb067e1c3af438a35fdd1ecd35 100644
--- a/source/dnode/vnode/tsdb/inc/tsdbCompact.h
+++ b/source/dnode/vnode/tsdb/inc/tsdbCompact.h
@@ -12,6 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+#if 0
+
#ifndef _TD_TSDB_COMPACT_H_
#define _TD_TSDB_COMPACT_H_
@@ -19,14 +21,12 @@
extern "C" {
#endif
-#if 0
-
void *tsdbCompactImpl(STsdbRepo *pRepo);
-#endif
-
#ifdef __cplusplus
}
#endif
-#endif /* _TD_TSDB_COMPACT_H_ */
\ No newline at end of file
+#endif /* _TD_TSDB_COMPACT_H_ */
+
+#endif
\ No newline at end of file
diff --git a/source/dnode/vnode/tsdb/inc/tsdbDef.h b/source/dnode/vnode/tsdb/inc/tsdbDef.h
index e81c51441f39e83f206172236b55b5c78c4e3e8c..042fafe13f849de81b50238157d95c7ca88c6231 100644
--- a/source/dnode/vnode/tsdb/inc/tsdbDef.h
+++ b/source/dnode/vnode/tsdb/inc/tsdbDef.h
@@ -54,6 +54,10 @@ struct STsdb {
#define REPO_CFG(r) (&(r)->config)
#define REPO_FS(r) (&(r)->fs)
+static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) {
+ return pTable->pSchema;
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/source/dnode/vnode/tsdb/inc/tsdbMemTable.h b/source/dnode/vnode/tsdb/inc/tsdbMemTable.h
index b4d104065606ff2407eefdfba17c78d4848bcdb7..eea322006e210e8938bb934feb9c81ded41f754f 100644
--- a/source/dnode/vnode/tsdb/inc/tsdbMemTable.h
+++ b/source/dnode/vnode/tsdb/inc/tsdbMemTable.h
@@ -22,6 +22,16 @@
extern "C" {
#endif
+typedef struct {
+ int rowsInserted;
+ int rowsUpdated;
+ int rowsDeleteSucceed;
+ int rowsDeleteFailed;
+ int nOperations;
+ TSKEY keyFirst;
+ TSKEY keyLast;
+} SMergeInfo;
+
typedef struct STbData {
tb_uid_t uid;
TSKEY keyMin;
@@ -42,10 +52,20 @@ typedef struct STsdbMemTable {
SHashObj * pHashIdx;
} STsdbMemTable;
-
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb);
void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable);
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
+int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
+ TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
+
+static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator *pIter) {
+ if (pIter == NULL) return NULL;
+
+ SSkipListNode *node = tSkipListIterGet(pIter);
+ if (node == NULL) return NULL;
+
+ return (SMemRow)SL_GET_NODE_DATA(node);
+}
#ifdef __cplusplus
}
diff --git a/source/dnode/vnode/tsdb/src/tsdbCommit.c b/source/dnode/vnode/tsdb/src/tsdbCommit.c
index 86f279a1a56f1ace08883468c4f65b3ab3ad8642..0bf7b63307ca798fb17a664e2890e9fea774c49e 100644
--- a/source/dnode/vnode/tsdb/src/tsdbCommit.c
+++ b/source/dnode/vnode/tsdb/src/tsdbCommit.c
@@ -60,6 +60,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static int tsdbNextCommitFid(SCommitH *pCommith);
static void tsdbDestroyCommitH(SCommitH *pCommith);
static int tsdbCreateCommitIters(SCommitH *pCommith);
+static void tsdbDestroyCommitIters(SCommitH *pCommith);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
@@ -127,7 +128,6 @@ int tsdbCommit(STsdb *pRepo) {
return -1;
}
-#if 0
// Skip expired memory data and expired FSET
tsdbSeekCommitIter(&commith, commith.rtn.minKey);
while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) {
@@ -138,6 +138,7 @@ int tsdbCommit(STsdb *pRepo) {
break;
}
}
+#if 0
// Loop to commit to each file
fid = tsdbNextCommitFid(&(commith));
@@ -280,28 +281,28 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
}
}
-static int tsdbNextCommitFid(SCommitH *pCommith) {
- STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
- STsdbCfg *pCfg = REPO_CFG(pRepo);
- int fid = TSDB_IVLD_FID;
-
- for (int i = 0; i < pCommith->niters; i++) {
- SCommitIter *pIter = pCommith->iters + i;
- if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
-
- TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
- if (nextKey == TSDB_DATA_TIMESTAMP_NULL) {
- continue;
- } else {
- int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision));
- if (fid == TSDB_IVLD_FID || fid > tfid) {
- fid = tfid;
- }
- }
- }
-
- return fid;
-}
+// static int tsdbNextCommitFid(SCommitH *pCommith) {
+// STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
+// STsdbCfg *pCfg = REPO_CFG(pRepo);
+// int fid = TSDB_IVLD_FID;
+
+// for (int i = 0; i < pCommith->niters; i++) {
+// SCommitIter *pIter = pCommith->iters + i;
+// if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
+
+// TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
+// if (nextKey == TSDB_DATA_TIMESTAMP_NULL) {
+// continue;
+// } else {
+// int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision));
+// if (fid == TSDB_IVLD_FID || fid > tfid) {
+// fid = tfid;
+// }
+// }
+// }
+
+// return fid;
+// }
static void tsdbDestroyCommitH(SCommitH *pCommith) {
pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols);
@@ -313,117 +314,109 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}
-static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
- STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
- STsdbCfg *pCfg = REPO_CFG(pRepo);
-
- ASSERT(pSet == NULL || pSet->fid == fid);
-
- tsdbResetCommitFile(pCommith);
- tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey));
-
- // Set and open files
- if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) {
- return -1;
- }
+// static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
+// STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
+// STsdbCfg *pCfg = REPO_CFG(pRepo);
+
+// ASSERT(pSet == NULL || pSet->fid == fid);
+
+// tsdbResetCommitFile(pCommith);
+// tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey));
+
+// // Set and open files
+// if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) {
+// return -1;
+// }
+
+// // Loop to commit each table data
+// for (int tid = 1; tid < pCommith->niters; tid++) {
+// SCommitIter *pIter = pCommith->iters + tid;
+
+// if (pIter->pTable == NULL) continue;
+
+// if (tsdbCommitToTable(pCommith, tid) < 0) {
+// tsdbCloseCommitFile(pCommith, true);
+// // revert the file change
+// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
+// return -1;
+// }
+// }
+
+// if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith))))
+// <
+// 0) {
+// tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
+// tsdbCloseCommitFile(pCommith, true);
+// // revert the file change
+// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
+// return -1;
+// }
+
+// if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) {
+// tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
+// tsdbCloseCommitFile(pCommith, true);
+// // revert the file change
+// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
+// return -1;
+// }
+
+// // Close commit file
+// tsdbCloseCommitFile(pCommith, false);
+
+// if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) {
+// return -1;
+// }
+
+// return 0;
+// }
- // Loop to commit each table data
- for (int tid = 1; tid < pCommith->niters; tid++) {
- SCommitIter *pIter = pCommith->iters + tid;
-
- if (pIter->pTable == NULL) continue;
-
- if (tsdbCommitToTable(pCommith, tid) < 0) {
- tsdbCloseCommitFile(pCommith, true);
- // revert the file change
- tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
- return -1;
- }
- }
-
- if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) <
- 0) {
- tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
- tsdbCloseCommitFile(pCommith, true);
- // revert the file change
- tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
+static int tsdbCreateCommitIters(SCommitH *pCommith) {
+ STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
+ STsdbMemTable * pMem = pRepo->imem;
+ SSkipListIterator *pSlIter;
+ SCommitIter * pCommitIter;
+ SSkipListNode * pNode;
+ STbData * pTbData;
+
+ pCommith->niters = SL_SIZE(pMem->pSlIdx);
+ pCommith->iters = (SCommitIter *)calloc(pCommith->niters, sizeof(SCommitIter));
+ if (pCommith->iters == NULL) {
+ terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
- if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) {
- tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
- tsdbCloseCommitFile(pCommith, true);
- // revert the file change
- tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
+ // Loop to create iters for each skiplist
+ pSlIter = tSkipListCreateIter(pMem->pSlIdx);
+ if (pSlIter == NULL) {
+ terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
+ for (int i = 0; i < pCommith->niters; i++) {
+ tSkipListIterNext(pSlIter);
+ pNode = tSkipListIterGet(pSlIter);
+ pTbData = (STbData *)pNode->pData;
- // Close commit file
- tsdbCloseCommitFile(pCommith, false);
-
- if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) {
- return -1;
+ pCommitIter = pCommith->iters + i;
+ pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
+ tSkipListIterNext(pCommitIter->pIter);
}
return 0;
}
-static int tsdbCreateCommitIters(SCommitH *pCommith) {
-#if 0
- STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
- SMemTable *pMem = pRepo->imem;
- // STsdbMeta *pMeta = pRepo->tsdbMeta;
-
- pCommith->niters = pMem->maxTables;
- pCommith->iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
- if (pCommith->iters == NULL) {
- terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
- return -1;
- }
-
- if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
-
- // reference all tables
- for (int i = 0; i < pMem->maxTables; i++) {
- if (pMeta->tables[i] != NULL) {
- tsdbRefTable(pMeta->tables[i]);
- pCommith->iters[i].pTable = pMeta->tables[i];
- }
- }
-
- if (tsdbUnlockRepoMeta(pRepo) < 0) return -1;
-
- for (int i = 0; i < pMem->maxTables; i++) {
- if ((pCommith->iters[i].pTable != NULL) && (pMem->tData[i] != NULL) &&
- (TABLE_UID(pCommith->iters[i].pTable) == pMem->tData[i]->uid)) {
- if ((pCommith->iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
- terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
- return -1;
- }
+static void tsdbDestroyCommitIters(SCommitH *pCommith) {
+ if (pCommith->iters == NULL) return;
- tSkipListIterNext(pCommith->iters[i].pIter);
- }
+ for (int i = 1; i < pCommith->niters; i++) {
+ tSkipListDestroyIter(pCommith->iters[i].pIter);
}
-#endif
- return 0;
+ free(pCommith->iters);
+ pCommith->iters = NULL;
+ pCommith->niters = 0;
}
#if 0
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
#include "tsdbint.h"
extern int32_t tsTsdbMetaCompactRatio;
@@ -895,21 +888,6 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
-static void tsdbDestroyCommitIters(SCommitH *pCommith) {
- if (pCommith->iters == NULL) return;
-
- for (int i = 1; i < pCommith->niters; i++) {
- if (pCommith->iters[i].pTable != NULL) {
- tsdbUnRefTable(pCommith->iters[i].pTable);
- tSkipListDestroyIter(pCommith->iters[i].pIter);
- }
- }
-
- free(pCommith->iters);
- pCommith->iters = NULL;
- pCommith->niters = 0;
-}
-
static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
diff --git a/source/dnode/vnode/tsdb/src/tsdbCompact.c b/source/dnode/vnode/tsdb/src/tsdbCompact.c
index 5ccb9e90f2407561709d36a85ac3e992e5d5a8ba..d890faea9bd67618f372794cbd67137617161622 100644
--- a/source/dnode/vnode/tsdb/src/tsdbCompact.c
+++ b/source/dnode/vnode/tsdb/src/tsdbCompact.c
@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+#if 0
#include "tsdbint.h"
typedef struct {
@@ -528,3 +529,5 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
return 0;
}
+#endif
+
diff --git a/source/dnode/vnode/tsdb/src/tsdbMemTable.c b/source/dnode/vnode/tsdb/src/tsdbMemTable.c
index f9f7ff70dc6133ed8a5da76df44119ea5842fd4c..4d0f436f49570915bc0cf6dd6c808f3688861b57 100644
--- a/source/dnode/vnode/tsdb/src/tsdbMemTable.c
+++ b/source/dnode/vnode/tsdb/src/tsdbMemTable.c
@@ -22,6 +22,7 @@ static void tsdbFreeTbData(STbData *pTbData);
static char * tsdbGetTsTupleKey(const void *data);
static int tsdbTbDataComp(const void *arg1, const void *arg2);
static char * tsdbTbDataGetUid(const void *arg);
+static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row);
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) {
STsdbMemTable *pMemTable = (STsdbMemTable *)calloc(1, sizeof(*pMemTable));
@@ -103,6 +104,129 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg,
return 0;
}
+/**
+ * This is an important function to load data or try to load data from memory skiplist iterator.
+ *
+ * This function load memory data until:
+ * 1. iterator ends
+ * 2. data key exceeds maxKey
+ * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
+ * 4. operations in pCols not exceeds its max capacity if pCols is given
+ *
+ * The function tries to procceed AS MUCH AS POSSIBLE.
+ */
+int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
+ TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
+ ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
+ if (pIter == NULL) return 0;
+ STSchema * pSchema = NULL;
+ TSKEY rowKey = 0;
+ TSKEY fKey = 0;
+ bool isRowDel = false;
+ int filterIter = 0;
+ SMemRow row = NULL;
+ SMergeInfo mInfo;
+
+ if (pMergeInfo == NULL) pMergeInfo = &mInfo;
+
+ memset(pMergeInfo, 0, sizeof(*pMergeInfo));
+ pMergeInfo->keyFirst = INT64_MAX;
+ pMergeInfo->keyLast = INT64_MIN;
+ if (pCols) tdResetDataCols(pCols);
+
+ row = tsdbNextIterRow(pIter);
+ if (row == NULL || memRowKey(row) > maxKey) {
+ rowKey = INT64_MAX;
+ isRowDel = false;
+ } else {
+ rowKey = memRowKey(row);
+ isRowDel = memRowDeleted(row);
+ }
+
+ if (filterIter >= nFilterKeys) {
+ fKey = INT64_MAX;
+ } else {
+ fKey = tdGetKey(filterKeys[filterIter]);
+ }
+
+ while (true) {
+ if (fKey == INT64_MAX && rowKey == INT64_MAX) break;
+
+ if (fKey < rowKey) {
+ pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey);
+ pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey);
+
+ filterIter++;
+ if (filterIter >= nFilterKeys) {
+ fKey = INT64_MAX;
+ } else {
+ fKey = tdGetKey(filterKeys[filterIter]);
+ }
+ } else if (fKey > rowKey) {
+ if (isRowDel) {
+ pMergeInfo->rowsDeleteFailed++;
+ } else {
+ if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
+ if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
+ pMergeInfo->rowsInserted++;
+ pMergeInfo->nOperations++;
+ pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey);
+ pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey);
+ tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
+ }
+
+ tSkipListIterNext(pIter);
+ row = tsdbNextIterRow(pIter);
+ if (row == NULL || memRowKey(row) > maxKey) {
+ rowKey = INT64_MAX;
+ isRowDel = false;
+ } else {
+ rowKey = memRowKey(row);
+ isRowDel = memRowDeleted(row);
+ }
+ } else {
+ if (isRowDel) {
+ ASSERT(!keepDup);
+ if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
+ pMergeInfo->rowsDeleteSucceed++;
+ pMergeInfo->nOperations++;
+ tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
+ } else {
+ if (keepDup) {
+ if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
+ pMergeInfo->rowsUpdated++;
+ pMergeInfo->nOperations++;
+ pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey);
+ pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey);
+ tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
+ } else {
+ pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey);
+ pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey);
+ }
+ }
+
+ tSkipListIterNext(pIter);
+ row = tsdbNextIterRow(pIter);
+ if (row == NULL || memRowKey(row) > maxKey) {
+ rowKey = INT64_MAX;
+ isRowDel = false;
+ } else {
+ rowKey = memRowKey(row);
+ isRowDel = memRowDeleted(row);
+ }
+
+ filterIter++;
+ if (filterIter >= nFilterKeys) {
+ fKey = INT64_MAX;
+ } else {
+ fKey = tdGetKey(filterKeys[filterIter]);
+ }
+ }
+ }
+
+ return 0;
+}
+
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) {
ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
@@ -313,6 +437,21 @@ static char *tsdbTbDataGetUid(const void *arg) {
STbData *pTbData = (STbData *)arg;
return (char *)(&(pTbData->uid));
}
+static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row) {
+ if (pCols) {
+ if (*ppSchema == NULL || schemaVersion(*ppSchema) != memRowVersion(row)) {
+ *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row));
+ if (*ppSchema == NULL) {
+ ASSERT(false);
+ return -1;
+ }
+ }
+
+ tdAppendMemRowToDataCol(row, *ppSchema, pCols, true);
+ }
+
+ return 0;
+}
/* ------------------------ REFACTORING ------------------------ */
#if 0
@@ -650,21 +789,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
}
// ---------------- LOCAL FUNCTIONS ----------------
-static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row) {
- if (pCols) {
- if (*ppSchema == NULL || schemaVersion(*ppSchema) != memRowVersion(row)) {
- *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row), (int8_t)memRowType(row));
- if (*ppSchema == NULL) {
- ASSERT(false);
- return -1;
- }
- }
-
- tdAppendMemRowToDataCol(row, *ppSchema, pCols, true, 0);
- }
-
- return 0;
-}
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
diff --git a/source/dnode/vnode/tsdb/src/tsdbReadImpl.c b/source/dnode/vnode/tsdb/src/tsdbReadImpl.c
index 1a2b213031f02a13e4bfdd0ec6b3c37b4b8b7c72..c4beac452d2eb3cd637e850396d4c48ac3a710ff 100644
--- a/source/dnode/vnode/tsdb/src/tsdbReadImpl.c
+++ b/source/dnode/vnode/tsdb/src/tsdbReadImpl.c
@@ -25,7 +25,6 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int3
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds);
static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);
-static STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) { return NULL; }
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
ASSERT(pReadh != NULL && pRepo != NULL);
@@ -667,4 +666,3 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
return 0;
}
-