提交 4db67d36 编写于 作者: H Hongze Cheng

Merge branch 'refact/tsdb_optimize' of https://github.com/taosdata/TDengine...

Merge branch 'refact/tsdb_optimize' of https://github.com/taosdata/TDengine into refact/tsdb_new_format
......@@ -103,224 +103,226 @@
#define TK_WAL_ROLL_PERIOD 85
#define TK_WAL_SEGMENT_SIZE 86
#define TK_SST_TRIGGER 87
#define TK_NK_COLON 88
#define TK_TABLE 89
#define TK_NK_LP 90
#define TK_NK_RP 91
#define TK_STABLE 92
#define TK_ADD 93
#define TK_COLUMN 94
#define TK_MODIFY 95
#define TK_RENAME 96
#define TK_TAG 97
#define TK_SET 98
#define TK_NK_EQ 99
#define TK_USING 100
#define TK_TAGS 101
#define TK_COMMENT 102
#define TK_BOOL 103
#define TK_TINYINT 104
#define TK_SMALLINT 105
#define TK_INT 106
#define TK_INTEGER 107
#define TK_BIGINT 108
#define TK_FLOAT 109
#define TK_DOUBLE 110
#define TK_BINARY 111
#define TK_TIMESTAMP 112
#define TK_NCHAR 113
#define TK_UNSIGNED 114
#define TK_JSON 115
#define TK_VARCHAR 116
#define TK_MEDIUMBLOB 117
#define TK_BLOB 118
#define TK_VARBINARY 119
#define TK_DECIMAL 120
#define TK_MAX_DELAY 121
#define TK_WATERMARK 122
#define TK_ROLLUP 123
#define TK_TTL 124
#define TK_SMA 125
#define TK_FIRST 126
#define TK_LAST 127
#define TK_SHOW 128
#define TK_DATABASES 129
#define TK_TABLES 130
#define TK_STABLES 131
#define TK_MNODES 132
#define TK_MODULES 133
#define TK_QNODES 134
#define TK_FUNCTIONS 135
#define TK_INDEXES 136
#define TK_ACCOUNTS 137
#define TK_APPS 138
#define TK_CONNECTIONS 139
#define TK_LICENCES 140
#define TK_GRANTS 141
#define TK_QUERIES 142
#define TK_SCORES 143
#define TK_TOPICS 144
#define TK_VARIABLES 145
#define TK_BNODES 146
#define TK_SNODES 147
#define TK_CLUSTER 148
#define TK_TRANSACTIONS 149
#define TK_DISTRIBUTED 150
#define TK_CONSUMERS 151
#define TK_SUBSCRIPTIONS 152
#define TK_VNODES 153
#define TK_LIKE 154
#define TK_INDEX 155
#define TK_FUNCTION 156
#define TK_INTERVAL 157
#define TK_TOPIC 158
#define TK_AS 159
#define TK_WITH 160
#define TK_META 161
#define TK_CONSUMER 162
#define TK_GROUP 163
#define TK_DESC 164
#define TK_DESCRIBE 165
#define TK_RESET 166
#define TK_QUERY 167
#define TK_CACHE 168
#define TK_EXPLAIN 169
#define TK_ANALYZE 170
#define TK_VERBOSE 171
#define TK_NK_BOOL 172
#define TK_RATIO 173
#define TK_NK_FLOAT 174
#define TK_OUTPUTTYPE 175
#define TK_AGGREGATE 176
#define TK_BUFSIZE 177
#define TK_STREAM 178
#define TK_INTO 179
#define TK_TRIGGER 180
#define TK_AT_ONCE 181
#define TK_WINDOW_CLOSE 182
#define TK_IGNORE 183
#define TK_EXPIRED 184
#define TK_KILL 185
#define TK_CONNECTION 186
#define TK_TRANSACTION 187
#define TK_BALANCE 188
#define TK_VGROUP 189
#define TK_MERGE 190
#define TK_REDISTRIBUTE 191
#define TK_SPLIT 192
#define TK_DELETE 193
#define TK_INSERT 194
#define TK_NULL 195
#define TK_NK_QUESTION 196
#define TK_NK_ARROW 197
#define TK_ROWTS 198
#define TK_TBNAME 199
#define TK_QSTART 200
#define TK_QEND 201
#define TK_QDURATION 202
#define TK_WSTART 203
#define TK_WEND 204
#define TK_WDURATION 205
#define TK_CAST 206
#define TK_NOW 207
#define TK_TODAY 208
#define TK_TIMEZONE 209
#define TK_CLIENT_VERSION 210
#define TK_SERVER_VERSION 211
#define TK_SERVER_STATUS 212
#define TK_CURRENT_USER 213
#define TK_COUNT 214
#define TK_LAST_ROW 215
#define TK_BETWEEN 216
#define TK_IS 217
#define TK_NK_LT 218
#define TK_NK_GT 219
#define TK_NK_LE 220
#define TK_NK_GE 221
#define TK_NK_NE 222
#define TK_MATCH 223
#define TK_NMATCH 224
#define TK_CONTAINS 225
#define TK_IN 226
#define TK_JOIN 227
#define TK_INNER 228
#define TK_SELECT 229
#define TK_DISTINCT 230
#define TK_WHERE 231
#define TK_PARTITION 232
#define TK_BY 233
#define TK_SESSION 234
#define TK_STATE_WINDOW 235
#define TK_SLIDING 236
#define TK_FILL 237
#define TK_VALUE 238
#define TK_NONE 239
#define TK_PREV 240
#define TK_LINEAR 241
#define TK_NEXT 242
#define TK_HAVING 243
#define TK_RANGE 244
#define TK_EVERY 245
#define TK_ORDER 246
#define TK_SLIMIT 247
#define TK_SOFFSET 248
#define TK_LIMIT 249
#define TK_OFFSET 250
#define TK_ASC 251
#define TK_NULLS 252
#define TK_ABORT 253
#define TK_AFTER 254
#define TK_ATTACH 255
#define TK_BEFORE 256
#define TK_BEGIN 257
#define TK_BITAND 258
#define TK_BITNOT 259
#define TK_BITOR 260
#define TK_BLOCKS 261
#define TK_CHANGE 262
#define TK_COMMA 263
#define TK_COMPACT 264
#define TK_CONCAT 265
#define TK_CONFLICT 266
#define TK_COPY 267
#define TK_DEFERRED 268
#define TK_DELIMITERS 269
#define TK_DETACH 270
#define TK_DIVIDE 271
#define TK_DOT 272
#define TK_EACH 273
#define TK_END 274
#define TK_FAIL 275
#define TK_FILE 276
#define TK_FOR 277
#define TK_GLOB 278
#define TK_ID 279
#define TK_IMMEDIATE 280
#define TK_IMPORT 281
#define TK_INITIALLY 282
#define TK_INSTEAD 283
#define TK_ISNULL 284
#define TK_KEY 285
#define TK_NK_BITNOT 286
#define TK_NK_SEMI 287
#define TK_NOTNULL 288
#define TK_OF 289
#define TK_PLUS 290
#define TK_PRIVILEGE 291
#define TK_RAISE 292
#define TK_REPLACE 293
#define TK_RESTRICT 294
#define TK_ROW 295
#define TK_SEMI 296
#define TK_STAR 297
#define TK_STATEMENT 298
#define TK_STRING 299
#define TK_TIMES 300
#define TK_UPDATE 301
#define TK_VALUES 302
#define TK_VARIABLE 303
#define TK_VIEW 304
#define TK_WAL 305
#define TK_TABLE_PREFIX 88
#define TK_TABLE_SUFFIX 89
#define TK_NK_COLON 90
#define TK_TABLE 91
#define TK_NK_LP 92
#define TK_NK_RP 93
#define TK_STABLE 94
#define TK_ADD 95
#define TK_COLUMN 96
#define TK_MODIFY 97
#define TK_RENAME 98
#define TK_TAG 99
#define TK_SET 100
#define TK_NK_EQ 101
#define TK_USING 102
#define TK_TAGS 103
#define TK_COMMENT 104
#define TK_BOOL 105
#define TK_TINYINT 106
#define TK_SMALLINT 107
#define TK_INT 108
#define TK_INTEGER 109
#define TK_BIGINT 110
#define TK_FLOAT 111
#define TK_DOUBLE 112
#define TK_BINARY 113
#define TK_TIMESTAMP 114
#define TK_NCHAR 115
#define TK_UNSIGNED 116
#define TK_JSON 117
#define TK_VARCHAR 118
#define TK_MEDIUMBLOB 119
#define TK_BLOB 120
#define TK_VARBINARY 121
#define TK_DECIMAL 122
#define TK_MAX_DELAY 123
#define TK_WATERMARK 124
#define TK_ROLLUP 125
#define TK_TTL 126
#define TK_SMA 127
#define TK_FIRST 128
#define TK_LAST 129
#define TK_SHOW 130
#define TK_DATABASES 131
#define TK_TABLES 132
#define TK_STABLES 133
#define TK_MNODES 134
#define TK_MODULES 135
#define TK_QNODES 136
#define TK_FUNCTIONS 137
#define TK_INDEXES 138
#define TK_ACCOUNTS 139
#define TK_APPS 140
#define TK_CONNECTIONS 141
#define TK_LICENCES 142
#define TK_GRANTS 143
#define TK_QUERIES 144
#define TK_SCORES 145
#define TK_TOPICS 146
#define TK_VARIABLES 147
#define TK_BNODES 148
#define TK_SNODES 149
#define TK_CLUSTER 150
#define TK_TRANSACTIONS 151
#define TK_DISTRIBUTED 152
#define TK_CONSUMERS 153
#define TK_SUBSCRIPTIONS 154
#define TK_VNODES 155
#define TK_LIKE 156
#define TK_INDEX 157
#define TK_FUNCTION 158
#define TK_INTERVAL 159
#define TK_TOPIC 160
#define TK_AS 161
#define TK_WITH 162
#define TK_META 163
#define TK_CONSUMER 164
#define TK_GROUP 165
#define TK_DESC 166
#define TK_DESCRIBE 167
#define TK_RESET 168
#define TK_QUERY 169
#define TK_CACHE 170
#define TK_EXPLAIN 171
#define TK_ANALYZE 172
#define TK_VERBOSE 173
#define TK_NK_BOOL 174
#define TK_RATIO 175
#define TK_NK_FLOAT 176
#define TK_OUTPUTTYPE 177
#define TK_AGGREGATE 178
#define TK_BUFSIZE 179
#define TK_STREAM 180
#define TK_INTO 181
#define TK_TRIGGER 182
#define TK_AT_ONCE 183
#define TK_WINDOW_CLOSE 184
#define TK_IGNORE 185
#define TK_EXPIRED 186
#define TK_KILL 187
#define TK_CONNECTION 188
#define TK_TRANSACTION 189
#define TK_BALANCE 190
#define TK_VGROUP 191
#define TK_MERGE 192
#define TK_REDISTRIBUTE 193
#define TK_SPLIT 194
#define TK_DELETE 195
#define TK_INSERT 196
#define TK_NULL 197
#define TK_NK_QUESTION 198
#define TK_NK_ARROW 199
#define TK_ROWTS 200
#define TK_TBNAME 201
#define TK_QSTART 202
#define TK_QEND 203
#define TK_QDURATION 204
#define TK_WSTART 205
#define TK_WEND 206
#define TK_WDURATION 207
#define TK_CAST 208
#define TK_NOW 209
#define TK_TODAY 210
#define TK_TIMEZONE 211
#define TK_CLIENT_VERSION 212
#define TK_SERVER_VERSION 213
#define TK_SERVER_STATUS 214
#define TK_CURRENT_USER 215
#define TK_COUNT 216
#define TK_LAST_ROW 217
#define TK_BETWEEN 218
#define TK_IS 219
#define TK_NK_LT 220
#define TK_NK_GT 221
#define TK_NK_LE 222
#define TK_NK_GE 223
#define TK_NK_NE 224
#define TK_MATCH 225
#define TK_NMATCH 226
#define TK_CONTAINS 227
#define TK_IN 228
#define TK_JOIN 229
#define TK_INNER 230
#define TK_SELECT 231
#define TK_DISTINCT 232
#define TK_WHERE 233
#define TK_PARTITION 234
#define TK_BY 235
#define TK_SESSION 236
#define TK_STATE_WINDOW 237
#define TK_SLIDING 238
#define TK_FILL 239
#define TK_VALUE 240
#define TK_NONE 241
#define TK_PREV 242
#define TK_LINEAR 243
#define TK_NEXT 244
#define TK_HAVING 245
#define TK_RANGE 246
#define TK_EVERY 247
#define TK_ORDER 248
#define TK_SLIMIT 249
#define TK_SOFFSET 250
#define TK_LIMIT 251
#define TK_OFFSET 252
#define TK_ASC 253
#define TK_NULLS 254
#define TK_ABORT 255
#define TK_AFTER 256
#define TK_ATTACH 257
#define TK_BEFORE 258
#define TK_BEGIN 259
#define TK_BITAND 260
#define TK_BITNOT 261
#define TK_BITOR 262
#define TK_BLOCKS 263
#define TK_CHANGE 264
#define TK_COMMA 265
#define TK_COMPACT 266
#define TK_CONCAT 267
#define TK_CONFLICT 268
#define TK_COPY 269
#define TK_DEFERRED 270
#define TK_DELIMITERS 271
#define TK_DETACH 272
#define TK_DIVIDE 273
#define TK_DOT 274
#define TK_EACH 275
#define TK_END 276
#define TK_FAIL 277
#define TK_FILE 278
#define TK_FOR 279
#define TK_GLOB 280
#define TK_ID 281
#define TK_IMMEDIATE 282
#define TK_IMPORT 283
#define TK_INITIALLY 284
#define TK_INSTEAD 285
#define TK_ISNULL 286
#define TK_KEY 287
#define TK_NK_BITNOT 288
#define TK_NK_SEMI 289
#define TK_NOTNULL 290
#define TK_OF 291
#define TK_PLUS 292
#define TK_PRIVILEGE 293
#define TK_RAISE 294
#define TK_REPLACE 295
#define TK_RESTRICT 296
#define TK_ROW 297
#define TK_SEMI 298
#define TK_STAR 299
#define TK_STATEMENT 300
#define TK_STRING 301
#define TK_TIMES 302
#define TK_UPDATE 303
#define TK_VALUES 304
#define TK_VARIABLE 305
#define TK_VIEW 306
#define TK_WAL 307
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
......
......@@ -82,6 +82,8 @@ typedef struct SDatabaseOptions {
bool walRetentionSizeIsSet;
bool walRollPeriodIsSet;
int32_t sstTrigger;
int32_t tablePrefix;
int32_t tableSuffix;
} SDatabaseOptions;
typedef struct SCreateDatabaseStmt {
......
......@@ -766,22 +766,23 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
taosArrayPush(pRequest->tableList, &pName);
// change tag cid to new cid
if(pCreateReq->type == TSDB_CHILD_TABLE){
if (pCreateReq->type == TSDB_CHILD_TABLE) {
STableMeta* pTableMeta = NULL;
SName sName = {0};
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.name, &sName);
code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
if(code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
uError("taosCreateTable:catalogGetTableMeta failed. table name: %s", pCreateReq->ctb.name);
goto end;
}
for(int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++){
for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
for(int32_t j = pTableMeta->tableInfo.numOfColumns; j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++){
SSchema *tag = &pTableMeta->schema[j];
if(strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON){
tTagSetCid((STag *)pCreateReq->ctb.pTag, i, tag->colId);
for (int32_t j = pTableMeta->tableInfo.numOfColumns;
j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
SSchema* tag = &pTableMeta->schema[j];
if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
tTagSetCid((STag*)pCreateReq->ctb.pTag, i, tag->colId);
}
}
}
......@@ -1322,7 +1323,7 @@ end:
return code;
}
static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pVgHash = NULL;
SQuery* pQuery = NULL;
......@@ -1405,7 +1406,7 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST){
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
code = TSDB_CODE_SUCCESS;
continue;
......@@ -1466,7 +1467,7 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
}
// pSW->pSchema should be same as pTableMeta->schema
// ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns);
// ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns);
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
uint64_t uid = pTableMeta->uid;
int16_t sver = pTableMeta->sversion;
......@@ -1495,9 +1496,9 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
const SSchema* pColumn = &pTableMeta->schema[k];
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
if(!index){
if (!index) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}else{
} else {
char* colData = rspObj.resInfo.row[*index];
if (!colData) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
......@@ -1668,7 +1669,7 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
} else if (raw.raw_type == TDMT_VND_DELETE) {
return taosDeleteData(taos, raw.raw, raw.raw_len);
} else if (raw.raw_type == RES_TYPE__TMQ) {
return tmqWriteRaw(taos, raw.raw, raw.raw_len);
return tmqWriteRawDataImpl(taos, raw.raw, raw.raw_len);
}
return TSDB_CODE_INVALID_PARA;
}
......@@ -1552,6 +1552,24 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
STR_WITH_MAXSIZE_TO_VARSTR(buf, "NULL", bytes);
}
const char *precStr = NULL;
switch (pDb->cfg.precision) {
case TSDB_TIME_PRECISION_MILLI:
precStr = TSDB_TIME_PRECISION_MILLI_STR;
break;
case TSDB_TIME_PRECISION_MICRO:
precStr = TSDB_TIME_PRECISION_MICRO_STR;
break;
case TSDB_TIME_PRECISION_NANO:
precStr = TSDB_TIME_PRECISION_NANO_STR;
break;
default:
precStr = "none";
break;
}
char precVstr[10] = {0};
STR_WITH_SIZE_TO_VARSTR(precVstr, precStr, 2);
char *statusStr = "ready";
if (objStatus == SDB_STATUS_CREATING) {
statusStr = "creating";
......@@ -1562,7 +1580,6 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
statusStr = "unsynced";
}
}
char statusVstr[24] = {0};
STR_WITH_SIZE_TO_VARSTR(statusVstr, statusStr, strlen(statusStr));
......@@ -1573,6 +1590,8 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
colDataAppend(pColInfo, rows, buf, false);
} else if (i == 3) {
colDataAppend(pColInfo, rows, (const char *)&numOfTables, false);
} else if (i == 14) {
colDataAppend(pColInfo, rows, precVstr, false);
} else if (i == 15) {
colDataAppend(pColInfo, rows, statusVstr, false);
} else {
......@@ -1637,23 +1656,6 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.compression, false);
const char *precStr = NULL;
switch (pDb->cfg.precision) {
case TSDB_TIME_PRECISION_MILLI:
precStr = TSDB_TIME_PRECISION_MILLI_STR;
break;
case TSDB_TIME_PRECISION_MICRO:
precStr = TSDB_TIME_PRECISION_MICRO_STR;
break;
case TSDB_TIME_PRECISION_NANO:
precStr = TSDB_TIME_PRECISION_NANO_STR;
break;
default:
precStr = "none";
break;
}
char precVstr[10] = {0};
STR_WITH_SIZE_TO_VARSTR(precVstr, precStr, 2);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)precVstr, false);
......
......@@ -24,7 +24,8 @@ struct SLDataIter {
int8_t backward;
SArray *aSstBlk;
int32_t iSstBlk;
SBlockData bData;
SBlockData bData[2];
int32_t loadIndex;
int32_t iRow;
SRowInfo rInfo;
uint64_t uid;
......@@ -32,6 +33,15 @@ struct SLDataIter {
SVersionRange verRange;
};
static SBlockData* getCurrentBlock(SLDataIter* pIter) {
return &pIter->bData[pIter->loadIndex];
}
static SBlockData* getNextBlock(SLDataIter* pIter) {
pIter->loadIndex ^= 1;
return getCurrentBlock(pIter);
}
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iSst, int8_t backward, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pRange) {
int32_t code = 0;
......@@ -53,7 +63,12 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
goto _exit;
}
code = tBlockDataCreate(&(*pIter)->bData);
code = tBlockDataCreate(&(*pIter)->bData[0]);
if (code) {
goto _exit;
}
code = tBlockDataCreate(&(*pIter)->bData[1]);
if (code) {
goto _exit;
}
......@@ -95,7 +110,8 @@ _exit:
}
void tLDataIterClose(SLDataIter *pIter) {
tBlockDataDestroy(&pIter->bData, 1);
tBlockDataDestroy(&pIter->bData[0], 1);
tBlockDataDestroy(&pIter->bData[1], 1);
taosArrayDestroy(pIter->aSstBlk);
taosMemoryFree(pIter);
}
......@@ -136,24 +152,26 @@ static void findNextValidRow(SLDataIter *pIter) {
bool hasVal = false;
int32_t i = pIter->iRow;
for (; i < pIter->bData.nRow && i >= 0; i += step) {
if (pIter->bData.aUid != NULL) {
SBlockData* pBlockData = getCurrentBlock(pIter);
for (; i < pBlockData->nRow && i >= 0; i += step) {
if (pBlockData->aUid != NULL) {
if (!pIter->backward) {
if (pIter->bData.aUid[i] < pIter->uid) {
if (pBlockData->aUid[i] < pIter->uid) {
continue;
} else if (pIter->bData.aUid[i] > pIter->uid) {
} else if (pBlockData->aUid[i] > pIter->uid) {
break;
}
} else {
if (pIter->bData.aUid[i] > pIter->uid) {
if (pBlockData->aUid[i] > pIter->uid) {
continue;
} else if (pIter->bData.aUid[i] < pIter->uid) {
} else if (pBlockData->aUid[i] < pIter->uid) {
break;
}
}
}
int64_t ts = pIter->bData.aTSKEY[i];
int64_t ts = pBlockData->aTSKEY[i];
if (!pIter->backward) { // asc
if (ts > pIter->timeWindow.ekey) { // no more data
break;
......@@ -168,7 +186,7 @@ static void findNextValidRow(SLDataIter *pIter) {
}
}
int64_t ver = pIter->bData.aVersion[i];
int64_t ver = pBlockData->aVersion[i];
if (ver < pIter->verRange.minVer) {
continue;
}
......@@ -203,14 +221,16 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
}
int32_t iBlockL = pIter->iSstBlk;
SBlockData* pBlockData = getCurrentBlock(pIter);
if (pIter->bData.nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet
code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData);
if (pBlockData->nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet
pBlockData = getNextBlock(pIter);
code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
pIter->iRow = (pIter->backward) ? pIter->bData.nRow : -1;
pIter->iRow = (pIter->backward) ? pBlockData->nRow : -1;
}
pIter->iRow += step;
......@@ -218,7 +238,7 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
while (1) {
findNextValidRow(pIter);
if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) {
if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
tLDataIterNextBlock(pIter);
if (pIter->pSstBlk == NULL) { // no more data
goto _exit;
......@@ -228,17 +248,18 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
}
if (iBlockL != pIter->iSstBlk) {
code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData);
pBlockData = getNextBlock(pIter);
code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
if (code) {
goto _exit;
}
pIter->iRow = pIter->backward ? (pIter->bData.nRow - 1) : 0;
pIter->iRow = pIter->backward ? (pBlockData->nRow - 1) : 0;
}
}
pIter->rInfo.suid = pIter->bData.suid;
pIter->rInfo.uid = pIter->bData.uid;
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
pIter->rInfo.suid = pBlockData->suid;
pIter->rInfo.uid = pBlockData->uid;
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
_exit:
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -337,12 +337,15 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet,
tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr);
return code;
}
}
SLastBlockReader* pLReader = pIter->pLastBlockReader;
pLReader->order = pReader->order;
pLReader->window = pReader->window;
pLReader->verRange = pReader->verRange;
}
pLReader->uid = 0;
tMergeTreeClose(&pLReader->mergeTree);
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
return TSDB_CODE_SUCCESS;
......@@ -1375,6 +1378,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
// SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL;
......@@ -1385,7 +1390,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
// merge with block data if ts == key
if (mergeBlockData) {
if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
......
......@@ -468,9 +468,13 @@ static int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
STSchema *pTSchema = metaGetTbTSchema(pVnode->pMeta, suid, -1);
// metaGetTbTSchemaEx(pVnode->pMeta, suid, suid, -1, &pTSchema);
if (pTSchema) {
*num = pTSchema->numOfCols;
taosMemoryFree(pTSchema);
} else {
*num = 2;
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -60,7 +60,9 @@ typedef enum EDatabaseOptionType {
DB_OPTION_WAL_RETENTION_SIZE,
DB_OPTION_WAL_ROLL_PERIOD,
DB_OPTION_WAL_SEGMENT_SIZE,
DB_OPTION_SST_TRIGGER
DB_OPTION_SST_TRIGGER,
DB_OPTION_TABLE_PREFIX,
DB_OPTION_TABLE_SUFFIX
} EDatabaseOptionType;
typedef enum ETableOptionType {
......
......@@ -208,6 +208,8 @@ db_options(A) ::= db_options(B) WAL_RETENTION_SIZE NK_MINUS(D) NK_INTEGER(C).
db_options(A) ::= db_options(B) WAL_ROLL_PERIOD NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_WAL_ROLL_PERIOD, &C); }
db_options(A) ::= db_options(B) WAL_SEGMENT_SIZE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_WAL_SEGMENT_SIZE, &C); }
db_options(A) ::= db_options(B) SST_TRIGGER NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_SST_TRIGGER, &C); }
db_options(A) ::= db_options(B) TABLE_PREFIX NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_TABLE_PREFIX, &C); }
db_options(A) ::= db_options(B) TABLE_SUFFIX NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_TABLE_SUFFIX, &C); }
alter_db_options(A) ::= alter_db_option(B). { A = createAlterDatabaseOptions(pCxt); A = setAlterDatabaseOption(pCxt, A, &B); }
alter_db_options(A) ::= alter_db_options(B) alter_db_option(C). { A = setAlterDatabaseOption(pCxt, B, &C); }
......
......@@ -836,6 +836,8 @@ SNode* createDefaultDatabaseOptions(SAstCreateContext* pCxt) {
updateWalOptionsDefault(pOptions);
pOptions->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
pOptions->sstTrigger = TSDB_DEFAULT_SST_TRIGGER;
pOptions->tablePrefix = TSDB_DEFAULT_HASH_PREFIX;
pOptions->tableSuffix = TSDB_DEFAULT_HASH_SUFFIX;
return (SNode*)pOptions;
}
......@@ -868,6 +870,8 @@ SNode* createAlterDatabaseOptions(SAstCreateContext* pCxt) {
pOptions->walRollPeriod = -1;
pOptions->walSegmentSize = -1;
pOptions->sstTrigger = -1;
pOptions->tablePrefix = -1;
pOptions->tableSuffix = -1;
return (SNode*)pOptions;
}
......@@ -954,6 +958,12 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti
case DB_OPTION_SST_TRIGGER:
pDbOptions->sstTrigger = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_TABLE_PREFIX:
pDbOptions->tablePrefix = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_TABLE_SUFFIX:
pDbOptions->tableSuffix = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
default:
break;
}
......
......@@ -200,6 +200,8 @@ static SKeyword keywordTable[] = {
{"SYSINFO", TK_SYSINFO},
{"TABLE", TK_TABLE},
{"TABLES", TK_TABLES},
{"TABLE_PREFIX", TK_TABLE_PREFIX},
{"TABLE_SUFFIX", TK_TABLE_SUFFIX},
{"TAG", TK_TAG},
{"TAGS", TK_TAGS},
{"TBNAME", TK_TBNAME},
......
......@@ -3483,6 +3483,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
pReq->walRollPeriod = pStmt->pOptions->walRollPeriod;
pReq->walSegmentSize = pStmt->pOptions->walSegmentSize;
pReq->sstTrigger = pStmt->pOptions->sstTrigger;
pReq->hashPrefix = pStmt->pOptions->tablePrefix;
pReq->hashSuffix = pStmt->pOptions->tableSuffix;
pReq->ignoreExist = pStmt->ignoreExists;
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
}
......@@ -3770,6 +3772,12 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
if (TSDB_CODE_SUCCESS == code) {
code = checkDbRangeOption(pCxt, "sstTrigger", pOptions->sstTrigger, TSDB_MIN_SST_TRIGGER, TSDB_MAX_SST_TRIGGER);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbRangeOption(pCxt, "tablePrefix", pOptions->tablePrefix, TSDB_MIN_HASH_PREFIX, TSDB_MAX_HASH_PREFIX);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbRangeOption(pCxt, "tableSuffix", pOptions->tableSuffix, TSDB_MIN_HASH_SUFFIX, TSDB_MAX_HASH_SUFFIX);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkOptionsDependency(pCxt, pDbName, pOptions);
}
......@@ -5856,7 +5864,7 @@ static int32_t rewriteShowVnodes(STranslateContext* pCxt, SQuery* pQuery) {
if (NULL != pShow->pDnodeId) {
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_id", pShow->pDnodeId, &pStmt->pWhere);
} else {
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_endpoint", pShow->pDnodeEndpoint, &pStmt->pWhere);
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_ep", pShow->pDnodeEndpoint, &pStmt->pWhere);
}
}
if (TSDB_CODE_SUCCESS == code) {
......
此差异已折叠。
......@@ -83,7 +83,7 @@ void generateInformationSchema(MockCatalogService* mcs) {
.done();
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_VNODES, TSDB_SYSTEM_TABLE, 2)
.addColumn("dnode_id", TSDB_DATA_TYPE_INT)
.addColumn("dnode_endpoint", TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN)
.addColumn("dnode_ep", TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN)
.done();
}
......
......@@ -116,6 +116,8 @@ TEST_F(ParserInitialCTest, createDatabase) {
expect.walRollPeriod = TSDB_REP_DEF_DB_WAL_ROLL_PERIOD;
expect.walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
expect.sstTrigger = TSDB_DEFAULT_SST_TRIGGER;
expect.hashPrefix = TSDB_DEFAULT_HASH_PREFIX;
expect.hashSuffix = TSDB_DEFAULT_HASH_SUFFIX;
};
auto setDbBufferFunc = [&](int32_t buffer) { expect.buffer = buffer; };
......@@ -157,6 +159,8 @@ TEST_F(ParserInitialCTest, createDatabase) {
auto setDbWalRollPeriod = [&](int32_t walRollPeriod) { expect.walRollPeriod = walRollPeriod; };
auto setDbWalSegmentSize = [&](int32_t walSegmentSize) { expect.walSegmentSize = walSegmentSize; };
auto setDbSstTrigger = [&](int32_t sstTrigger) { expect.sstTrigger = sstTrigger; };
auto setDbHashPrefix = [&](int32_t hashPrefix) { expect.hashPrefix = hashPrefix; };
auto setDbHashSuffix = [&](int32_t hashSuffix) { expect.hashSuffix = hashSuffix; };
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_DATABASE_STMT);
......@@ -188,6 +192,8 @@ TEST_F(ParserInitialCTest, createDatabase) {
ASSERT_EQ(req.walRollPeriod, expect.walRollPeriod);
ASSERT_EQ(req.walSegmentSize, expect.walSegmentSize);
ASSERT_EQ(req.sstTrigger, expect.sstTrigger);
ASSERT_EQ(req.hashPrefix, expect.hashPrefix);
ASSERT_EQ(req.hashSuffix, expect.hashSuffix);
ASSERT_EQ(req.ignoreExist, expect.ignoreExist);
ASSERT_EQ(req.numOfRetensions, expect.numOfRetensions);
if (expect.numOfRetensions > 0) {
......@@ -236,6 +242,8 @@ TEST_F(ParserInitialCTest, createDatabase) {
setDbWalRollPeriod(10);
setDbWalSegmentSize(20);
setDbSstTrigger(16);
setDbHashPrefix(3);
setDbHashSuffix(4);
run("CREATE DATABASE IF NOT EXISTS wxy_db "
"BUFFER 64 "
"CACHEMODEL 'last_value' "
......@@ -260,7 +268,9 @@ TEST_F(ParserInitialCTest, createDatabase) {
"WAL_RETENTION_SIZE -1 "
"WAL_ROLL_PERIOD 10 "
"WAL_SEGMENT_SIZE 20 "
"SST_TRIGGER 16");
"SST_TRIGGER 16 "
"TABLE_PREFIX 3"
"TABLE_SUFFIX 4");
clearCreateDbReq();
setCreateDbReqFunc("wxy_db", 1);
......
......@@ -133,10 +133,10 @@ if $data(4)[5] != localhost:7100 then
return -1
endi
#sql show vnodes 'localhost:7100'
#if $rows != 9 then
# return -1
#endi
sql show vnodes 'localhost:7100'
if $rows != 9 then
return -1
endi
print =============== drop database
sql drop database d2
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册