未验证 提交 0908755d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4730 from taosdata/feature/TD-2354

Feature/td 2354
......@@ -5442,6 +5442,7 @@ static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
pMsg->quorum = pCreateDb->quorum;
pMsg->ignoreExist = pCreateDb->ignoreExists;
pMsg->update = pCreateDb->update;
pMsg->cacheLastRow = pCreateDb->cachelast;
}
int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) {
......
......@@ -94,6 +94,7 @@ extern int32_t tsFsyncPeriod;
extern int32_t tsReplications;
extern int32_t tsQuorum;
extern int32_t tsUpdate;
extern int32_t tsCacheLastRow;
// balance
extern int32_t tsEnableBalance;
......
......@@ -127,6 +127,7 @@ int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION;
int32_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION;
int32_t tsCacheLastRow = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
int32_t tsMaxVgroupsPerDb = 0;
int32_t tsMinTablePerVnode = TSDB_TABLES_STEP;
int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES;
......
......@@ -217,4 +217,4 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
dnodeStartMnode(&pCfg->mnodes);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
}
......@@ -369,6 +369,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_MAX_DB_UPDATE 1
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
#define TSDB_MAX_DB_CACHE_LAST_ROW 1
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
#define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
......
......@@ -549,7 +549,8 @@ typedef struct {
int8_t quorum;
int8_t ignoreExist;
int8_t update;
int8_t reserve[9];
int8_t cacheLastRow;
int8_t reserve[8];
} SCreateDbMsg, SAlterDbMsg;
typedef struct {
......@@ -661,8 +662,9 @@ typedef struct {
int8_t wals;
int8_t quorum;
int8_t update;
int8_t reserved[11];
int8_t cacheLastRow;
int32_t vgCfgVersion;
int8_t reserved[10];
} SVnodeCfg;
typedef struct {
......
......@@ -66,6 +66,7 @@ typedef struct {
int8_t precision;
int8_t compression;
int8_t update;
int8_t cacheLastRow;
} STsdbCfg;
// --------- TSDB REPOSITORY USAGE STATISTICS
......@@ -119,7 +120,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
// TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
......
......@@ -114,114 +114,115 @@
#define TK_COMP 96
#define TK_PRECISION 97
#define TK_UPDATE 98
#define TK_LP 99
#define TK_RP 100
#define TK_TAGS 101
#define TK_USING 102
#define TK_AS 103
#define TK_COMMA 104
#define TK_NULL 105
#define TK_SELECT 106
#define TK_UNION 107
#define TK_ALL 108
#define TK_FROM 109
#define TK_VARIABLE 110
#define TK_INTERVAL 111
#define TK_FILL 112
#define TK_SLIDING 113
#define TK_ORDER 114
#define TK_BY 115
#define TK_ASC 116
#define TK_DESC 117
#define TK_GROUP 118
#define TK_HAVING 119
#define TK_LIMIT 120
#define TK_OFFSET 121
#define TK_SLIMIT 122
#define TK_SOFFSET 123
#define TK_WHERE 124
#define TK_NOW 125
#define TK_RESET 126
#define TK_QUERY 127
#define TK_ADD 128
#define TK_COLUMN 129
#define TK_TAG 130
#define TK_CHANGE 131
#define TK_SET 132
#define TK_KILL 133
#define TK_CONNECTION 134
#define TK_STREAM 135
#define TK_COLON 136
#define TK_ABORT 137
#define TK_AFTER 138
#define TK_ATTACH 139
#define TK_BEFORE 140
#define TK_BEGIN 141
#define TK_CASCADE 142
#define TK_CLUSTER 143
#define TK_CONFLICT 144
#define TK_COPY 145
#define TK_DEFERRED 146
#define TK_DELIMITERS 147
#define TK_DETACH 148
#define TK_EACH 149
#define TK_END 150
#define TK_EXPLAIN 151
#define TK_FAIL 152
#define TK_FOR 153
#define TK_IGNORE 154
#define TK_IMMEDIATE 155
#define TK_INITIALLY 156
#define TK_INSTEAD 157
#define TK_MATCH 158
#define TK_KEY 159
#define TK_OF 160
#define TK_RAISE 161
#define TK_REPLACE 162
#define TK_RESTRICT 163
#define TK_ROW 164
#define TK_STATEMENT 165
#define TK_TRIGGER 166
#define TK_VIEW 167
#define TK_COUNT 168
#define TK_SUM 169
#define TK_AVG 170
#define TK_MIN 171
#define TK_MAX 172
#define TK_FIRST 173
#define TK_LAST 174
#define TK_TOP 175
#define TK_BOTTOM 176
#define TK_STDDEV 177
#define TK_PERCENTILE 178
#define TK_APERCENTILE 179
#define TK_LEASTSQUARES 180
#define TK_HISTOGRAM 181
#define TK_DIFF 182
#define TK_SPREAD 183
#define TK_TWA 184
#define TK_INTERP 185
#define TK_LAST_ROW 186
#define TK_RATE 187
#define TK_IRATE 188
#define TK_SUM_RATE 189
#define TK_SUM_IRATE 190
#define TK_AVG_RATE 191
#define TK_AVG_IRATE 192
#define TK_TBID 193
#define TK_SEMI 194
#define TK_NONE 195
#define TK_PREV 196
#define TK_LINEAR 197
#define TK_IMPORT 198
#define TK_METRIC 199
#define TK_TBNAME 200
#define TK_JOIN 201
#define TK_METRICS 202
#define TK_STABLE 203
#define TK_INSERT 204
#define TK_INTO 205
#define TK_VALUES 206
#define TK_CACHELAST 99
#define TK_LP 100
#define TK_RP 101
#define TK_TAGS 102
#define TK_USING 103
#define TK_AS 104
#define TK_COMMA 105
#define TK_NULL 106
#define TK_SELECT 107
#define TK_UNION 108
#define TK_ALL 109
#define TK_FROM 110
#define TK_VARIABLE 111
#define TK_INTERVAL 112
#define TK_FILL 113
#define TK_SLIDING 114
#define TK_ORDER 115
#define TK_BY 116
#define TK_ASC 117
#define TK_DESC 118
#define TK_GROUP 119
#define TK_HAVING 120
#define TK_LIMIT 121
#define TK_OFFSET 122
#define TK_SLIMIT 123
#define TK_SOFFSET 124
#define TK_WHERE 125
#define TK_NOW 126
#define TK_RESET 127
#define TK_QUERY 128
#define TK_ADD 129
#define TK_COLUMN 130
#define TK_TAG 131
#define TK_CHANGE 132
#define TK_SET 133
#define TK_KILL 134
#define TK_CONNECTION 135
#define TK_STREAM 136
#define TK_COLON 137
#define TK_ABORT 138
#define TK_AFTER 139
#define TK_ATTACH 140
#define TK_BEFORE 141
#define TK_BEGIN 142
#define TK_CASCADE 143
#define TK_CLUSTER 144
#define TK_CONFLICT 145
#define TK_COPY 146
#define TK_DEFERRED 147
#define TK_DELIMITERS 148
#define TK_DETACH 149
#define TK_EACH 150
#define TK_END 151
#define TK_EXPLAIN 152
#define TK_FAIL 153
#define TK_FOR 154
#define TK_IGNORE 155
#define TK_IMMEDIATE 156
#define TK_INITIALLY 157
#define TK_INSTEAD 158
#define TK_MATCH 159
#define TK_KEY 160
#define TK_OF 161
#define TK_RAISE 162
#define TK_REPLACE 163
#define TK_RESTRICT 164
#define TK_ROW 165
#define TK_STATEMENT 166
#define TK_TRIGGER 167
#define TK_VIEW 168
#define TK_COUNT 169
#define TK_SUM 170
#define TK_AVG 171
#define TK_MIN 172
#define TK_MAX 173
#define TK_FIRST 174
#define TK_LAST 175
#define TK_TOP 176
#define TK_BOTTOM 177
#define TK_STDDEV 178
#define TK_PERCENTILE 179
#define TK_APERCENTILE 180
#define TK_LEASTSQUARES 181
#define TK_HISTOGRAM 182
#define TK_DIFF 183
#define TK_SPREAD 184
#define TK_TWA 185
#define TK_INTERP 186
#define TK_LAST_ROW 187
#define TK_RATE 188
#define TK_IRATE 189
#define TK_SUM_RATE 190
#define TK_SUM_IRATE 191
#define TK_AVG_RATE 192
#define TK_AVG_IRATE 193
#define TK_TBID 194
#define TK_SEMI 195
#define TK_NONE 196
#define TK_PREV 197
#define TK_LINEAR 198
#define TK_IMPORT 199
#define TK_METRIC 200
#define TK_TBNAME 201
#define TK_JOIN 202
#define TK_METRICS 203
#define TK_STABLE 204
#define TK_INSERT 205
#define TK_INTO 206
#define TK_VALUES 207
#define TK_SPACE 300
......
......@@ -174,7 +174,8 @@ typedef struct {
int8_t replications;
int8_t quorum;
int8_t update;
int8_t reserved[11];
int8_t cacheLastRow;
int8_t reserved[10];
} SDbCfg;
typedef struct SDbObj {
......
......@@ -322,6 +322,11 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) {
mError("invalid db option cacheLastRow:%d valid range: [%d, %d]", pCfg->cacheLastRow, TSDB_MIN_DB_CACHE_LAST_ROW, TSDB_MAX_DB_CACHE_LAST_ROW);
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
return TSDB_CODE_SUCCESS;
}
......@@ -343,6 +348,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->replications < 0) pCfg->replications = tsReplications;
if (pCfg->quorum < 0) pCfg->quorum = tsQuorum;
if (pCfg->update < 0) pCfg->update = tsUpdate;
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = tsCacheLastRow;
}
static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) {
......@@ -396,7 +402,8 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
.walLevel = pCreate->walLevel,
.replications = pCreate->replications,
.quorum = pCreate->quorum,
.update = pCreate->update
.update = pCreate->update,
.cacheLastRow = pCreate->cacheLastRow
};
mnodeSetDefaultDbCfg(&pDb->cfg);
......@@ -605,6 +612,12 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
strcpy(pSchema[cols].name, "comp");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "cachelast");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
#ifndef __CLOUD_VERSION__
}
#endif
......@@ -750,6 +763,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.compression;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.cacheLastRow;
cols++;
#ifndef __CLOUD_VERSION__
}
#endif
......@@ -864,6 +881,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
int8_t quorum = pAlter->quorum;
int8_t precision = pAlter->precision;
int8_t update = pAlter->update;
int8_t cacheLastRow = pAlter->cacheLastRow;
terrno = TSDB_CODE_SUCCESS;
......@@ -976,6 +994,11 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
#endif
}
if (cacheLastRow >= 0 && cacheLastRow != pDb->cfg.cacheLastRow) {
mDebug("db:%s, cacheLastRow:%d change to %d", pDb->name, pDb->cfg.cacheLastRow, cacheLastRow);
newCfg.cacheLastRow = cacheLastRow;
}
return newCfg;
}
......
......@@ -863,6 +863,7 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
pCfg->wals = 3;
pCfg->quorum = pDb->cfg.quorum;
pCfg->update = pDb->cfg.update;
pCfg->cacheLastRow = pDb->cfg.cacheLastRow;
SVnodeDesc *pNodes = pVnode->nodes;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
......
......@@ -120,7 +120,8 @@ typedef struct SCreateDBInfo {
int32_t compressionLevel;
SStrToken precision;
bool ignoreExists;
int8_t update;
int8_t update;
int8_t cachelast;
SArray *keep;
} SCreateDBInfo;
......
此差异已折叠。
......@@ -4662,7 +4662,6 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
// update the query time window
pQuery->window = cond.twindow;
if (pQInfo->tableGroupInfo.numOfTables == 0) {
pQInfo->tableqinfoGroupInfo.numOfTables = 0;
} else {
......
......@@ -846,5 +846,6 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
pDBInfo->keep = NULL;
pDBInfo->update = -1;
pDBInfo->cachelast = 0;
memset(&pDBInfo->precision, 0, sizeof(SStrToken));
}
......@@ -238,6 +238,7 @@ static SKeyword keywordTable[] = {
{"SUM_IRATE", TK_SUM_IRATE},
{"AVG_RATE", TK_AVG_RATE},
{"AVG_IRATE", TK_AVG_IRATE},
{"CACHELAST", TK_CACHELAST},
};
static const char isIdChar[] = {
......
此差异已折叠。
......@@ -66,7 +66,8 @@ typedef struct STable {
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
void* eventHandler; // TODO
void* streamHandler; // TODO
TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure
TSKEY lastKey;
SDataRow lastRow;
char* sql;
void* cqhandle;
SRWLatch latch; // TODO: implementa latch functions
......@@ -360,8 +361,11 @@ typedef struct {
#define TABLE_UID(t) (t)->tableId.uid
#define TABLE_TID(t) (t)->tableId.tid
#define TABLE_SUID(t) (t)->suid
#define TABLE_LASTKEY(t) (t)->lastKey
#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch))
#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch))
#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch))
#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch))
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
void tsdbFreeMeta(STsdbMeta* pMeta);
......@@ -391,7 +395,7 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
STSchema* pSchema = NULL;
STSchema* pTSchema = NULL;
if (lock) taosRLockLatch(&(pDTable->latch));
if (lock) TSDB_RLOCK_TABLE(pDTable);
if (version < 0) { // get the latest version of schema
pTSchema = pDTable->schema[pDTable->numOfSchemas - 1];
} else { // get the schema with version
......@@ -413,7 +417,7 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
}
_exit:
if (lock) taosRUnLockLatch(&(pDTable->latch));
if (lock) TSDB_RUNLOCK_TABLE(pDTable);
return pSchema;
}
......@@ -433,6 +437,11 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
}
}
static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) {
ASSERT(pTable->lastRow == NULL || pTable->lastKey == dataRowKey(pTable->lastRow));
return pTable->lastKey;
}
// ------------------ tsdbBuffer.c
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
......
......@@ -225,7 +225,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue;
taosRLockLatch(&(pIter->pTable->latch));
TSDB_RLOCK_TABLE(pIter->pTable);
if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
......@@ -236,7 +236,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
}
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch));
TSDB_RUNLOCK_TABLE(pIter->pTable);
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
tstrerror(terrno));
......@@ -244,7 +244,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
}
}
taosRUnLockLatch(&(pIter->pTable->latch));
TSDB_RUNLOCK_TABLE(pIter->pTable);
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
......
......@@ -77,9 +77,9 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
tsdbDebug(
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d daysPerFile %d keep "
"%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d",
"%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d update %d cacheLastRow %d",
pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock,
pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression);
pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression, pCfg->update, pCfg->cacheLastRow);
return 0;
}
......@@ -281,6 +281,10 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
config.totalBlocks = pCfg->totalBlocks;
configChanged = true;
}
if (pRCfg->cacheLastRow != pCfg->cacheLastRow) {
config.cacheLastRow = pCfg->cacheLastRow;
configChanged = true;
}
if (configChanged) {
if (tsdbSaveConfig(pRepo->rootDir, &config) < 0) {
......@@ -475,6 +479,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// update check
if (pCfg->update != 0) pCfg->update = 1;
// update cacheLastRow
if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1;
return 0;
_err:
......@@ -692,10 +699,12 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
}
}
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
static int tsdbRestoreInfo(STsdbRepo *pRepo) { // TODO
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pFGroup = NULL;
STsdbCfg * pCfg = &(pRepo->config);
SCompBlock *pBlock = NULL;
SFileGroupIter iter;
SRWHelper rhelper = {0};
......@@ -713,7 +722,32 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err;
SCompIdx *pIdx = &(rhelper.curCompIdx);
if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey;
TSKEY lastKey = tsdbGetTableLastKeyImpl(pTable);
if (pIdx->offset > 0 && lastKey < pIdx->maxKey) {
pTable->lastKey = pIdx->maxKey;
if (pCfg->cacheLastRow) { // load the block of data
if (tsdbLoadCompInfo(&rhelper, NULL) < 0) goto _err;
pBlock = rhelper.pCompInfo->blocks + pIdx->numOfBlocks - 1;
if (tsdbLoadBlockData(&rhelper, pBlock, NULL) < 0) goto _err;
// construct the data row
ASSERT(pTable->lastRow == NULL);
STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(schemaTLen(pSchema));
if (pTable->lastRow == NULL) {
goto _err;
}
tdInitDataRow(pTable->lastRow, pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = rhelper.pDataCols[0]->cols + icol;
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
pCol->offset);
}
}
}
}
}
......@@ -800,6 +834,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
tlen += taosEncodeFixedI8(buf, pCfg->precision);
tlen += taosEncodeFixedI8(buf, pCfg->compression);
tlen += taosEncodeFixedI8(buf, pCfg->update);
tlen += taosEncodeFixedI8(buf, pCfg->cacheLastRow);
return tlen;
}
......@@ -817,6 +852,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
buf = taosDecodeFixedI8(buf, &(pCfg->precision));
buf = taosDecodeFixedI8(buf, &(pCfg->compression));
buf = taosDecodeFixedI8(buf, &(pCfg->update));
buf = taosDecodeFixedI8(buf, &(pCfg->cacheLastRow));
return buf;
}
......
......@@ -36,6 +36,7 @@ static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPB
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter);
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row);
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now);
......@@ -663,9 +664,10 @@ static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void
return -1;
}
if (key > TABLE_LASTKEY(pTable)) {
TSKEY lastKey = tsdbGetTableLastKeyImpl(pTable);
if (key > lastKey) {
tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
REPO_ID(pRepo), key, TABLE_LASTKEY(pTable));
REPO_ID(pRepo), key, lastKey);
return 0;
}
}
......@@ -846,8 +848,10 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro
if (pTableData->keyLast < dataRowKey(rows[rowCounter - 1])) pTableData->keyLast = dataRowKey(rows[rowCounter - 1]);
pTableData->numOfRows += dsize;
// TODO: impl delete row thing
if (TABLE_LASTKEY(pTable) < dataRowKey(rows[rowCounter-1])) TABLE_LASTKEY(pTable) = dataRowKey(rows[rowCounter-1]);
// update table latest info
if (tsdbUpdateTableLatestInfo(pRepo, pTable, rows[rowCounter - 1]) < 0) {
return -1;
}
return 0;
}
......@@ -889,4 +893,38 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
}
}
}
}
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
STsdbCfg *pCfg = &pRepo->config;
if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) {
if (pCfg->cacheLastRow || pTable->lastRow != NULL) {
SDataRow nrow = pTable->lastRow;
if (taosTSizeof(nrow) < dataRowLen(row)) {
SDataRow orow = nrow;
nrow = taosTMalloc(dataRowLen(row));
if (nrow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
dataRowCpy(nrow, row);
TSDB_WLOCK_TABLE(pTable);
pTable->lastKey = dataRowKey(row);
pTable->lastRow = nrow;
TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(orow);
} else {
TSDB_WLOCK_TABLE(pTable);
pTable->lastKey = dataRowKey(row);
dataRowCpy(nrow, row);
TSDB_WUNLOCK_TABLE(pTable);
}
} else {
pTable->lastKey = dataRowKey(row);
}
}
return 0;
}
\ No newline at end of file
......@@ -377,11 +377,11 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
// Chage in memory
if (pNewSchema != NULL) { // change super table tag schema
taosWLockLatch(&(pTable->pSuper->latch));
TSDB_WLOCK_TABLE(pTable->pSuper);
STSchema *pOldSchema = pTable->pSuper->tagSchema;
pTable->pSuper->tagSchema = pNewSchema;
tdFreeSchema(pOldSchema);
taosWUnLockLatch(&(pTable->pSuper->latch));
TSDB_WUNLOCK_TABLE(pTable->pSuper);
}
bool isChangeIndexCol = (pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)));
......@@ -392,9 +392,9 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
tsdbWLockRepoMeta(pRepo);
tsdbRemoveTableFromIndex(pMeta, pTable);
}
taosWLockLatch(&(pTable->latch));
TSDB_WLOCK_TABLE(pTable);
tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pMsg->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen));
taosWUnLockLatch(&(pTable->latch));
TSDB_WUNLOCK_TABLE(pTable);
if (isChangeIndexCol) {
tsdbAddTableIntoIndex(pMeta, pTable, false);
tsdbUnlockRepoMeta(pRepo);
......@@ -587,7 +587,7 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1]));
taosWLockLatch(&(pCTable->latch));
TSDB_WLOCK_TABLE(pCTable);
if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
pCTable->schema[pCTable->numOfSchemas++] = pSchema;
} else {
......@@ -599,7 +599,7 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
taosWUnLockLatch(&(pCTable->latch));
TSDB_WUNLOCK_TABLE(pCTable);
if (insertAct) {
int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pCTable);
......@@ -775,6 +775,7 @@ static void tsdbFreeTable(STable *pTable) {
kvRowFree(pTable->tagVal);
tSkipListDestroy(pTable->pIndex);
taosTZfree(pTable->lastRow);
tfree(pTable->sql);
free(pTable);
}
......
......@@ -110,6 +110,7 @@ typedef struct STsdbQueryHandle {
SArray* pTableCheckInfo; // SArray<STableCheckInfo>
int32_t activeIndex;
bool checkFiles; // check file stage
bool cachelastrow; // check if last row cached
void* qinfo; // query info handle, for debug purpose
int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
SFileGroup* pFileGroup;
......@@ -133,7 +134,9 @@ typedef struct STableGroupSupporter {
STSchema* pTagSchema;
} STableGroupSupporter;
static STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList);
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock);
......@@ -370,7 +373,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
}
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) {
pCond->twindow = changeTableGroupByLastrow(groupList);
pCond->twindow = updateLastrowForEachGroup(groupList);
// no qualified table
if (groupList->numOfTables == 0) {
......@@ -378,8 +381,14 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
}
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo, pMemRef);
int32_t code = checkForCachedLastRow(pQueryHandle, groupList);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code;
return NULL;
}
assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
return pQueryHandle;
}
......@@ -2144,6 +2153,39 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
// restore the pMemRef
pQueryHandle->pMemRef = pMemRef;
return ret;
} else if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) {
// the last row is cached in buffer, return it directly.
// here note that the pQueryHandle->window must be the TS_INITIALIZER
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
SQueryFilePos* cur = &pQueryHandle->cur;
SDataRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
if (++pQueryHandle->activeIndex < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
if (ret != TSDB_CODE_SUCCESS) {
return false;
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj);
tfree(pRow);
// update the last key value
pCheckInfo->lastKey = key + step;
cur->rows = 1; // only one row
cur->lastKey = key + step;
cur->mixBlock = true;
cur->win.skey = key;
cur->win.ekey = key;
return true;
}
return false;
}
if (pQueryHandle->checkFiles) {
......@@ -2176,7 +2218,57 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
return ret;
}
STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
/*
* 1. no data at all (pTable->lastKey = TSKEY_INITIAL_VAL), just return TSKEY_INITIAL_VAL
* 2. has data but not loaded, just return lastKey but not set pRes
* 3. has data and loaded, return lastKey and set pRes
*/
int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) {
TSDB_RLOCK_TABLE(pTable);
*lastKey = pTable->lastKey;
if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow) {
*pRes = tdDataRowDup(pTable->lastRow);
if (*pRes == NULL) {
TSDB_RUNLOCK_TABLE(pTable);
return TSDB_CODE_TDB_OUT_OF_MEMORY;
}
}
TSDB_RUNLOCK_TABLE(pTable);
return TSDB_CODE_SUCCESS;
}
int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) {
assert(pQueryHandle != NULL && groupList != NULL);
SDataRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
SArray* group = taosArrayGetP(groupList->pGroupList, 0);
assert(group != NULL);
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
int32_t code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key);
if (code != TSDB_CODE_SUCCESS) {
pQueryHandle->cachelastrow = false;
} else {
pQueryHandle->cachelastrow = (pRow != NULL);
}
// update the tsdb query time range
if (pQueryHandle->cachelastrow) {
pQueryHandle->window = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = false;
pQueryHandle->activeIndex = -1; // start from -1
}
tfree(pRow);
return code;
}
STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
STimeWindow window = {INT64_MAX, INT64_MIN};
int32_t totalNumOfTable = 0;
......@@ -2191,16 +2283,16 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
size_t numOfTables = taosArrayGetSize(pGroup);
for(int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pGroup, i);
STableKeyInfo* pInfo = (STableKeyInfo*) taosArrayGet(pGroup, i);
// if the lastKey equals to INT64_MIN, there is no data in this table
TSKEY lastKey = ((STable*)(pKeyInfo->pTable))->lastKey;
TSKEY lastKey = ((STable*)(pInfo->pTable))->lastKey;
if (key < lastKey) {
key = lastKey;
keyInfo.pTable = pKeyInfo->pTable;
keyInfo.pTable = pInfo->pTable;
keyInfo.lastKey = key;
pKeyInfo->lastKey = key;
pInfo->lastKey = key;
if (key < window.skey) {
window.skey = key;
......@@ -2214,11 +2306,11 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
// clear current group, unref unused table
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
// keyInfo.pTable may be NULL here.
if (pKeyInfo->pTable != keyInfo.pTable) {
tsdbUnRefTable(pKeyInfo->pTable);
if (pInfo->pTable != keyInfo.pTable) {
tsdbUnRefTable(pInfo->pTable);
}
}
......
......@@ -34,6 +34,7 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
pVnode->tsdbCfg.maxRowsPerFileBlock = vnodeMsg->cfg.maxRowsPerFileBlock;
pVnode->tsdbCfg.precision = vnodeMsg->cfg.precision;
pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression;
pVnode->tsdbCfg.cacheLastRow = vnodeMsg->cfg.cacheLastRow;
pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel;
pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod;
pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP;
......@@ -216,6 +217,15 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
}
vnodeMsg.cfg.quorum = (int8_t)quorum->valueint;
cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow");
if (!cacheLastRow || cacheLastRow->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file);
//goto PARSE_VCFG_ERROR;
vnodeMsg.cfg.cacheLastRow = 0;
} else {
vnodeMsg.cfg.cacheLastRow = (int8_t)cacheLastRow->valueint;
}
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
vError("vgId:%d, failed to read %s, nodeInfos not found", pVnode->vgId, file);
......@@ -304,6 +314,7 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) {
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.replications);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pMsg->cfg.replications; i++) {
SVnodeDesc *node = &pMsg->nodes[i];
......
......@@ -86,6 +86,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = pVnodeCfg->cfg.compression;
tsdbCfg.update = pVnodeCfg->cfg.update;
tsdbCfg.cacheLastRow = pVnodeCfg->cfg.cacheLastRow;
char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
......
......@@ -81,7 +81,7 @@ print =============== step2 - no db
#11
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'show databases' 127.0.0.1:7111/rest/sql
print 11-> $system_content
if $system_content != @{"status":"succ","head":["name","created_time","ntables","vgroups","replica","quorum","days","keep1,keep2,keep(D)","cache(MB)","blocks","minrows","maxrows","wallevel","fsync","comp","precision","update","status"],"data":[],"rows":0}@ then
if $system_content != @{"status":"succ","head":["name","created_time","ntables","vgroups","replica","quorum","days","keep1,keep2,keep(D)","cache(MB)","blocks","minrows","maxrows","wallevel","fsync","comp","cachelast","precision","update","status"],"data":[],"rows":0}@ then
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册