提交 67461d92 编写于 作者: K kailixu

chore: merge 3.0 and solve conflict

...@@ -51,6 +51,11 @@ DESCRIBE [db_name.]stb_name; ...@@ -51,6 +51,11 @@ DESCRIBE [db_name.]stb_name;
### View tag information for all child tables in the supertable ### View tag information for all child tables in the supertable
```
SHOW TABLE TAGS FROM table_name [FROM db_name];
SHOW TABLE TAGS FROM [db_name.]table_name;
```
``` ```
taos> SHOW TABLE TAGS FROM st1; taos> SHOW TABLE TAGS FROM st1;
tbname | id | loc | tbname | id | loc |
......
...@@ -101,6 +101,7 @@ Note: TDengine Enterprise Edition only. ...@@ -101,6 +101,7 @@ Note: TDengine Enterprise Edition only.
```sql ```sql
SHOW INDEXES FROM tbl_name [FROM db_name]; SHOW INDEXES FROM tbl_name [FROM db_name];
SHOW INDEXES FROM [db_name.]tbl_name;
``` ```
Shows indices that have been created. Shows indices that have been created.
...@@ -326,6 +327,7 @@ Note that only the information about the data blocks in the data file will be di ...@@ -326,6 +327,7 @@ Note that only the information about the data blocks in the data file will be di
```sql ```sql
SHOW TAGS FROM child_table_name [FROM db_name]; SHOW TAGS FROM child_table_name [FROM db_name];
SHOW TAGS FROM [db_name.]child_table_name;
``` ```
Shows all tag information in a subtable. Shows all tag information in a subtable.
......
...@@ -43,6 +43,7 @@ DROP INDEX index_name; ...@@ -43,6 +43,7 @@ DROP INDEX index_name;
````sql ````sql
```sql ```sql
SHOW INDEXES FROM tbl_name [FROM db_name]; SHOW INDEXES FROM tbl_name [FROM db_name];
SHOW INDEXES FROM [db_name.]tbl_name ;
```` ````
Shows indices that have been created for the specified database or table. Shows indices that have been created for the specified database or table.
...@@ -36,8 +36,8 @@ REST connection supports all platforms that can run Java. ...@@ -36,8 +36,8 @@ REST connection supports all platforms that can run Java.
| taos-jdbcdriver version | major changes | TDengine version | | taos-jdbcdriver version | major changes | TDengine version |
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: | | :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: |
| 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | 3.0.5.0 or later | | 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | - |
| 3.2.3 | Fixed resultSet data parsing failure in some cases | 3.0.5.0 or later | | 3.2.3 | Fixed resultSet data parsing failure in some cases | - |
| 3.2.2 | Subscription add seek function | 3.0.5.0 or later | | 3.2.2 | Subscription add seek function | 3.0.5.0 or later |
| 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later | | 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later |
| 3.2.0 | This version has been deprecated | - | | 3.2.0 | This version has been deprecated | - |
...@@ -1019,11 +1019,13 @@ while(true) { ...@@ -1019,11 +1019,13 @@ while(true) {
#### Assignment subscription Offset #### Assignment subscription Offset
```java ```java
// get offset
long position(TopicPartition partition) throws SQLException; long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException; Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException; Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException; Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
// Overrides the fetch offsets that the consumer will use on the next poll(timeout).
void seek(TopicPartition partition, long offset) throws SQLException; void seek(TopicPartition partition, long offset) throws SQLException;
``` ```
......
...@@ -1022,11 +1022,13 @@ while(true) { ...@@ -1022,11 +1022,13 @@ while(true) {
#### 指定订阅 Offset #### 指定订阅 Offset
```java ```java
// 获取 offset
long position(TopicPartition partition) throws SQLException; long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException; Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException; Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException; Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
// 指定下一次 poll 中使用的 offset
void seek(TopicPartition partition, long offset) throws SQLException; void seek(TopicPartition partition, long offset) throws SQLException;
``` ```
......
...@@ -51,6 +51,11 @@ DESCRIBE [db_name.]stb_name; ...@@ -51,6 +51,11 @@ DESCRIBE [db_name.]stb_name;
### 获取超级表中所有子表的标签信息 ### 获取超级表中所有子表的标签信息
```
SHOW TABLE TAGS FROM table_name [FROM db_name];
SHOW TABLE TAGS FROM [db_name.]table_name;
```
``` ```
taos> SHOW TABLE TAGS FROM st1; taos> SHOW TABLE TAGS FROM st1;
tbname | id | loc | tbname | id | loc |
......
...@@ -101,6 +101,7 @@ SHOW GRANTS; ...@@ -101,6 +101,7 @@ SHOW GRANTS;
```sql ```sql
SHOW INDEXES FROM tbl_name [FROM db_name]; SHOW INDEXES FROM tbl_name [FROM db_name];
SHOW INDEXES FROM [db_name.]tbl_name;
``` ```
显示已创建的索引。 显示已创建的索引。
...@@ -269,6 +270,7 @@ Query OK, 24 row(s) in set (0.002444s) ...@@ -269,6 +270,7 @@ Query OK, 24 row(s) in set (0.002444s)
```sql ```sql
SHOW TAGS FROM child_table_name [FROM db_name]; SHOW TAGS FROM child_table_name [FROM db_name];
SHOW TAGS FROM [db_name.]child_table_name;
``` ```
显示子表的标签信息。 显示子表的标签信息。
......
...@@ -43,6 +43,7 @@ DROP INDEX index_name; ...@@ -43,6 +43,7 @@ DROP INDEX index_name;
````sql ````sql
```sql ```sql
SHOW INDEXES FROM tbl_name [FROM db_name]; SHOW INDEXES FROM tbl_name [FROM db_name];
SHOW INDEXES FROM [db_name.]tbl_name;
```` ````
显示在所指定的数据库或表上已创建的索引。 显示在所指定的数据库或表上已创建的索引。
...@@ -16,105 +16,105 @@ ...@@ -16,105 +16,105 @@
#ifndef _TD_COMMON_TOKEN_H_ #ifndef _TD_COMMON_TOKEN_H_
#define _TD_COMMON_TOKEN_H_ #define _TD_COMMON_TOKEN_H_
#define TK_OR 1 #define TK_OR 1
#define TK_AND 2 #define TK_AND 2
#define TK_UNION 3 #define TK_UNION 3
#define TK_ALL 4 #define TK_ALL 4
#define TK_MINUS 5 #define TK_MINUS 5
#define TK_EXCEPT 6 #define TK_EXCEPT 6
#define TK_INTERSECT 7 #define TK_INTERSECT 7
#define TK_NK_BITAND 8 #define TK_NK_BITAND 8
#define TK_NK_BITOR 9 #define TK_NK_BITOR 9
#define TK_NK_LSHIFT 10 #define TK_NK_LSHIFT 10
#define TK_NK_RSHIFT 11 #define TK_NK_RSHIFT 11
#define TK_NK_PLUS 12 #define TK_NK_PLUS 12
#define TK_NK_MINUS 13 #define TK_NK_MINUS 13
#define TK_NK_STAR 14 #define TK_NK_STAR 14
#define TK_NK_SLASH 15 #define TK_NK_SLASH 15
#define TK_NK_REM 16 #define TK_NK_REM 16
#define TK_NK_CONCAT 17 #define TK_NK_CONCAT 17
#define TK_CREATE 18 #define TK_CREATE 18
#define TK_ACCOUNT 19 #define TK_ACCOUNT 19
#define TK_NK_ID 20 #define TK_NK_ID 20
#define TK_PASS 21 #define TK_PASS 21
#define TK_NK_STRING 22 #define TK_NK_STRING 22
#define TK_ALTER 23 #define TK_ALTER 23
#define TK_PPS 24 #define TK_PPS 24
#define TK_TSERIES 25 #define TK_TSERIES 25
#define TK_STORAGE 26 #define TK_STORAGE 26
#define TK_STREAMS 27 #define TK_STREAMS 27
#define TK_QTIME 28 #define TK_QTIME 28
#define TK_DBS 29 #define TK_DBS 29
#define TK_USERS 30 #define TK_USERS 30
#define TK_CONNS 31 #define TK_CONNS 31
#define TK_STATE 32 #define TK_STATE 32
#define TK_USER 33 #define TK_USER 33
#define TK_ENABLE 34 #define TK_ENABLE 34
#define TK_NK_INTEGER 35 #define TK_NK_INTEGER 35
#define TK_SYSINFO 36 #define TK_SYSINFO 36
#define TK_DROP 37 #define TK_DROP 37
#define TK_GRANT 38 #define TK_GRANT 38
#define TK_ON 39 #define TK_ON 39
#define TK_TO 40 #define TK_TO 40
#define TK_REVOKE 41 #define TK_REVOKE 41
#define TK_FROM 42 #define TK_FROM 42
#define TK_SUBSCRIBE 43 #define TK_SUBSCRIBE 43
#define TK_NK_COMMA 44 #define TK_NK_COMMA 44
#define TK_READ 45 #define TK_READ 45
#define TK_WRITE 46 #define TK_WRITE 46
#define TK_NK_DOT 47 #define TK_NK_DOT 47
#define TK_WITH 48 #define TK_WITH 48
#define TK_DNODE 49 #define TK_DNODE 49
#define TK_PORT 50 #define TK_PORT 50
#define TK_DNODES 51 #define TK_DNODES 51
#define TK_RESTORE 52 #define TK_RESTORE 52
#define TK_NK_IPTOKEN 53 #define TK_NK_IPTOKEN 53
#define TK_FORCE 54 #define TK_FORCE 54
#define TK_UNSAFE 55 #define TK_UNSAFE 55
#define TK_LOCAL 56 #define TK_LOCAL 56
#define TK_QNODE 57 #define TK_QNODE 57
#define TK_BNODE 58 #define TK_BNODE 58
#define TK_SNODE 59 #define TK_SNODE 59
#define TK_MNODE 60 #define TK_MNODE 60
#define TK_VNODE 61 #define TK_VNODE 61
#define TK_DATABASE 62 #define TK_DATABASE 62
#define TK_USE 63 #define TK_USE 63
#define TK_FLUSH 64 #define TK_FLUSH 64
#define TK_TRIM 65 #define TK_TRIM 65
#define TK_COMPACT 66 #define TK_COMPACT 66
#define TK_IF 67 #define TK_IF 67
#define TK_NOT 68 #define TK_NOT 68
#define TK_EXISTS 69 #define TK_EXISTS 69
#define TK_BUFFER 70 #define TK_BUFFER 70
#define TK_CACHEMODEL 71 #define TK_CACHEMODEL 71
#define TK_CACHESIZE 72 #define TK_CACHESIZE 72
#define TK_COMP 73 #define TK_COMP 73
#define TK_DURATION 74 #define TK_DURATION 74
#define TK_NK_VARIABLE 75 #define TK_NK_VARIABLE 75
#define TK_MAXROWS 76 #define TK_MAXROWS 76
#define TK_MINROWS 77 #define TK_MINROWS 77
#define TK_KEEP 78 #define TK_KEEP 78
#define TK_PAGES 79 #define TK_PAGES 79
#define TK_PAGESIZE 80 #define TK_PAGESIZE 80
#define TK_TSDB_PAGESIZE 81 #define TK_TSDB_PAGESIZE 81
#define TK_PRECISION 82 #define TK_PRECISION 82
#define TK_REPLICA 83 #define TK_REPLICA 83
#define TK_VGROUPS 84 #define TK_VGROUPS 84
#define TK_SINGLE_STABLE 85 #define TK_SINGLE_STABLE 85
#define TK_RETENTIONS 86 #define TK_RETENTIONS 86
#define TK_SCHEMALESS 87 #define TK_SCHEMALESS 87
#define TK_WAL_LEVEL 88 #define TK_WAL_LEVEL 88
#define TK_WAL_FSYNC_PERIOD 89 #define TK_WAL_FSYNC_PERIOD 89
#define TK_WAL_RETENTION_PERIOD 90 #define TK_WAL_RETENTION_PERIOD 90
#define TK_WAL_RETENTION_SIZE 91 #define TK_WAL_RETENTION_SIZE 91
#define TK_WAL_ROLL_PERIOD 92 #define TK_WAL_ROLL_PERIOD 92
#define TK_WAL_SEGMENT_SIZE 93 #define TK_WAL_SEGMENT_SIZE 93
#define TK_STT_TRIGGER 94 #define TK_STT_TRIGGER 94
#define TK_TABLE_PREFIX 95 #define TK_TABLE_PREFIX 95
#define TK_TABLE_SUFFIX 96 #define TK_TABLE_SUFFIX 96
#define TK_NK_COLON 97 #define TK_NK_COLON 97
#define TK_MAX_SPEED 98 #define TK_MAX_SPEED 98
#define TK_START 99 #define TK_START 99
#define TK_TIMESTAMP 100 #define TK_TIMESTAMP 100
#define TK_END 101 #define TK_END 101
#define TK_TABLE 102 #define TK_TABLE 102
...@@ -355,8 +355,6 @@ ...@@ -355,8 +355,6 @@
#define TK_WAL 337 #define TK_WAL 337
#define TK_NK_SPACE 600 #define TK_NK_SPACE 600
#define TK_NK_COMMENT 601 #define TK_NK_COMMENT 601
#define TK_NK_ILLEGAL 602 #define TK_NK_ILLEGAL 602
......
...@@ -265,22 +265,6 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -265,22 +265,6 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
#if 0
if (pMgmt->pTfs) {
if (tfsDirExistAt(pMgmt->pTfs, path, (SDiskID){0})) {
terrno = TSDB_CODE_VND_DIR_ALREADY_EXIST;
dError("vgId:%d, failed to restore vnode since %s", req.vgId, terrstr());
return -1;
}
} else {
if (taosDirExist(path)) {
terrno = TSDB_CODE_VND_DIR_ALREADY_EXIST;
dError("vgId:%d, failed to restore vnode since %s", req.vgId, terrstr());
return -1;
}
}
#endif
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) { if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
tFreeSCreateVnodeReq(&req); tFreeSCreateVnodeReq(&req);
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr()); dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
......
...@@ -220,8 +220,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con ...@@ -220,8 +220,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
// int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); // int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName);
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int32_t ref = T_REF_INC(pRSmaInfo); int32_t ref = T_REF_INC(pRSmaInfo);
...@@ -232,7 +231,7 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { ...@@ -232,7 +231,7 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
} }
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName); void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputName);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -86,6 +86,9 @@ void vnodeBufPoolReset(SVBufPool* pPool); ...@@ -86,6 +86,9 @@ void vnodeBufPoolReset(SVBufPool* pPool);
void vnodeBufPoolAddToFreeList(SVBufPool* pPool); void vnodeBufPoolAddToFreeList(SVBufPool* pPool);
int32_t vnodeBufPoolRecycle(SVBufPool* pPool); int32_t vnodeBufPoolRecycle(SVBufPool* pPool);
// vnodeOpen.c
int32_t vnodeGetPrimaryDir(const char* relPath, STfs* pTfs, char* buf, size_t bufLen);
// vnodeQuery.c // vnodeQuery.c
int32_t vnodeQueryOpen(SVnode* pVnode); int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryPreClose(SVnode* pVnode); void vnodeQueryPreClose(SVnode* pVnode);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "meta.h" #include "meta.h"
#include "vnd.h"
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
...@@ -34,30 +35,27 @@ static void metaCleanup(SMeta **ppMeta); ...@@ -34,30 +35,27 @@ static void metaCleanup(SMeta **ppMeta);
int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
SMeta *pMeta = NULL; SMeta *pMeta = NULL;
int ret; int ret;
int slen; int offset;
char path[TSDB_FILENAME_LEN] = {0};
*ppMeta = NULL; *ppMeta = NULL;
// create handle // create handle
if (pVnode->pTfs) { vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, path, TSDB_FILENAME_LEN);
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(VNODE_META_DIR) + 3; offset = strlen(path);
} else { snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_META_DIR);
slen = strlen(pVnode->path) + strlen(VNODE_META_DIR) + 2;
} if ((pMeta = taosMemoryCalloc(1, sizeof(*pMeta) + strlen(path) + 1)) == NULL) {
if ((pMeta = taosMemoryCalloc(1, sizeof(*pMeta) + slen)) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
metaInitLock(pMeta); metaInitLock(pMeta);
pMeta->path = (char *)&pMeta[1]; pMeta->path = (char *)&pMeta[1];
if (pVnode->pTfs) { strcpy(pMeta->path, path);
sprintf(pMeta->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, taosRealPath(pMeta->path, NULL, strlen(path) + 1);
VNODE_META_DIR);
} else {
sprintf(pMeta->path, "%s%s%s", pVnode->path, TD_DIRSEP, VNODE_META_DIR);
}
taosRealPath(pMeta->path, NULL, slen);
pMeta->pVnode = pVnode; pMeta->pVnode = pVnode;
// create path if not created yet // create path if not created yet
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
#include "vnd.h"
// =================================================================================================
// static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2);
static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) {
int32_t n = 0;
n += tPutI8(p ? p + n : p, pFile->level);
n += tPutI64v(p ? p + n : p, pFile->size);
n += tPutI64v(p ? p + n : p, pFile->suid);
n += tPutI64v(p ? p + n : p, pFile->version);
n += tPutI64v(p ? p + n : p, pFile->mtime);
return n;
}
static int32_t tdRSmaFSToBinary(uint8_t *p, SRSmaFS *pFS) {
int32_t n = 0;
uint32_t size = taosArrayGetSize(pFS->aQTaskInf);
// version
n += tPutI8(p ? p + n : p, 0);
// SArray<SQTaskFile>
n += tPutU32v(p ? p + n : p, size);
for (uint32_t i = 0; i < size; ++i) {
n += tPutQTaskF(p ? p + n : p, taosArrayGet(pFS->aQTaskInf, i));
}
return n;
}
int32_t tdRSmaGetQTaskF(uint8_t *p, SQTaskFile *pFile) {
int32_t n = 0;
n += tGetI8(p + n, &pFile->level);
n += tGetI64v(p + n, &pFile->size);
n += tGetI64v(p + n, &pFile->suid);
n += tGetI64v(p + n, &pFile->version);
n += tGetI64v(p + n, &pFile->mtime);
return n;
}
static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) {
int32_t code = 0;
int32_t n = 0;
int8_t version = 0;
// version
n += tGetI8(pData + n, &version);
// SArray<SQTaskFile>
taosArrayClear(pFS->aQTaskInf);
uint32_t size = 0;
n += tGetU32v(pData + n, &size);
for (uint32_t i = 0; i < size; ++i) {
SQTaskFile qTaskF = {0};
int32_t nt = tdRSmaGetQTaskF(pData + n, &qTaskF);
if (nt < 0) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
n += nt;
if (taosArrayPush(pFS->aQTaskInf, &qTaskF) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
if (ASSERTS(n + sizeof(TSCKSUM) == nData, "n:%d + sizeof(TSCKSUM):%d != nData:%d", n, (int32_t)sizeof(TSCKSUM),
nData)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
_exit:
return code;
}
static int32_t tdRSmaSaveFSToFile(SRSmaFS *pFS, const char *fname) {
int32_t code = 0;
int32_t lino = 0;
// encode to binary
int32_t size = tdRSmaFSToBinary(NULL, pFS) + sizeof(TSCKSUM);
uint8_t *pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
tdRSmaFSToBinary(pData, pFS);
taosCalcChecksumAppend(0, pData, size);
// save to file
TdFilePtr pFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t n = taosWriteFile(pFD, pData, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
_exit:
if (pData) taosMemoryFree(pData);
if (code) {
smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code;
}
static int32_t tdRSmaFSCreate(SRSmaFS *pFS, int32_t size) {
int32_t code = 0;
pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile));
if (pFS->aQTaskInf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
return code;
}
static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) {
SVnode *pVnode = pSma->pVnode;
int32_t offset = 0;
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, current, TSDB_FILENAME_LEN);
offset = strlen(current);
snprintf(current + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%sPRESENT", TD_DIRSEP, VNODE_RSMA_DIR, TD_DIRSEP);
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, current_t, TSDB_FILENAME_LEN);
offset = strlen(current_t);
snprintf(current_t + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%sPRESENT.t", TD_DIRSEP, VNODE_RSMA_DIR, TD_DIRSEP);
}
static int32_t tdRSmaLoadFSFromFile(const char *fname, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
uint8_t *pData = NULL;
// load binary
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t size;
if (taosFStatFile(pFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosReadFile(pFD, pData, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (!taosCheckChecksumWhole(pData, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
// decode binary
code = tsdbBinaryToFS(pData, size, pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (pData) taosMemoryFree(pData);
if (code) {
smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code;
}
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
const SQTaskFile *q1 = (const SQTaskFile *)p1;
const SQTaskFile *q2 = (const SQTaskFile *)p2;
if (q1->suid < q2->suid) {
return -1;
} else if (q1->suid > q2->suid) {
return 1;
}
if (q1->level < q2->level) {
return -1;
} else if (q1->level > q2->level) {
return 1;
}
if (q1->version < q2->version) {
return -2;
} else if (q1->version > q2->version) {
return 1;
}
return 0;
}
static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
int32_t nRef = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFSOld = RSMA_FS(pStat);
int64_t version = pStat->commitAppliedVer;
char fname[TSDB_FILENAME_LEN] = {0};
// SQTaskFile
int32_t nNew = taosArrayGetSize(pFSNew->aQTaskInf);
int32_t iNew = 0;
while (iNew < nNew) {
SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFSNew->aQTaskInf, iNew++);
int32_t idx = taosArraySearchIdx(pFSOld->aQTaskInf, pQTaskFNew, tdQTaskInfCmprFn1, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFSOld->aQTaskInf);
pQTaskFNew->nRef = 1;
} else {
SQTaskFile *pTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx);
int32_t c1 = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF);
if (c1 == 0) {
// utilize the item in pFSOld->qQTaskInf, instead of pFSNew
continue;
} else if (c1 < 0) {
// NOTHING TODO
} else {
code = TSDB_CODE_RSMA_FS_UPDATE;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
if (taosArrayInsert(pFSOld->aQTaskInf, idx, pQTaskFNew) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
// remove previous version
while (--idx >= 0) {
SQTaskFile *preTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx);
int32_t c2 = tdQTaskInfCmprFn1(preTaskF, pQTaskFNew);
if (c2 == 0) {
code = TSDB_CODE_RSMA_FS_UPDATE;
TSDB_CHECK_CODE(code, lino, _exit);
} else if (c2 != -2) {
break;
}
nRef = atomic_sub_fetch_32(&preTaskF->nRef, 1);
if (nRef <= 0) {
tdRSmaQTaskInfoGetFullName(pVnode, preTaskF->suid, preTaskF->level, preTaskF->version, pVnode->pTfs, fname);
(void)taosRemoveFile(fname);
taosArrayRemove(pFSOld->aQTaskInf, idx);
}
}
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) {
int32_t code = 0;
#if 0
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFS = RSMA_FS(pStat);
char fname[TSDB_FILENAME_LEN] = {0};
char fnameVer[TSDB_FILENAME_LEN] = {0};
// SArray<SQTaskFile>
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
// main.tdb =========
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, pTaskF->version,
pVnode->pTfs, fnameVer);
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, -1, pVnode->pTfs, fname);
if (taosCheckExistFile(fnameVer)) {
if (taosRenameFile(fnameVer, fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, %s:%d succeed to to rename %s to %s", TD_VID(pVnode), __func__, lino, fnameVer, fname);
} else if (taosCheckExistFile(fname)) {
if (taosRemoveFile(fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, %s:%d succeed to to remove %s", TD_VID(pVnode), __func__, lino, fname);
}
}
{
// remove those invalid files (todo)
// main.tdb-journal.5 // TDB should handle its clear for kill -9
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
#endif
return code;
}
// EXPOSED APIS ====================================================================================
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
// open handle
code = tdRSmaFSCreate(RSMA_FS(pStat), 0);
TSDB_CHECK_CODE(code, lino, _exit);
// open impl
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
if (taosCheckExistFile(current)) {
code = tdRSmaLoadFSFromFile(current, RSMA_FS(pStat));
TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_t)) {
if (rollback) {
code = tdRSmaFSRollback(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else {
// 1st time open with empty current/qTaskInfoFile
code = tdRSmaSaveFSToFile(RSMA_FS(pStat), current);
TSDB_CHECK_CODE(code, lino, _exit);
}
// scan and try fix(remove main.db/main.db.xxx and use the one with version)
code = tdRSmaFSScanAndTryFix(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
void tdRSmaFSClose(SRSmaFS *pFS) { pFS->aQTaskInf = taosArrayDestroy(pFS->aQTaskInf); }
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
char tfname[TSDB_FILENAME_LEN];
tdRSmaGetCurrentFName(pSma, NULL, tfname);
// generate PRESENT.t
code = tdRSmaSaveFSToFile(pFSNew, tfname);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SRSmaFS fs = {0};
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
if (!taosCheckExistFile(current_t)) {
goto _exit;
}
// rename the file
if (taosRenameFile(current_t, current) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
// load the new FS
code = tdRSmaFSCreate(&fs, 1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaLoadFSFromFile(current, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
// apply file change
code = tdRSmaFSApplyChange(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
tdRSmaFSClose(&fs);
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSFinishCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
taosWLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
} else {
smaInfo("vgId:%d, rsmaFS finish commit", SMA_VID(pSma));
}
return code;
}
int32_t tdRSmaFSRollback(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, NULL, current_t);
(void)taosRemoveFile(current_t);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(errno));
}
return code;
}
int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize) {
int32_t code = 0;
for (int32_t i = 0; i < nSize; ++i) {
SQTaskFile *qTaskF = qTaskFile + i;
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskF, tdQTaskInfCmprFn1, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFS->aQTaskInf);
} else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF);
if (c == 0) {
if (pTaskF->size != qTaskF->size) {
code = TSDB_CODE_RSMA_FS_UPDATE;
smaError("vgId:%d, %s failed at line %d since %s, level:%" PRIi8 ", suid:%" PRIi64 ", version:%" PRIi64
", size:%" PRIi64 " != %" PRIi64,
SMA_VID(pSma), __func__, __LINE__, tstrerror(code), pTaskF->level, pTaskF->suid, pTaskF->version,
pTaskF->size, qTaskF->size);
goto _exit;
}
continue;
}
}
if (!taosArrayInsert(pFS->aQTaskInf, idx, qTaskF)) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
_exit:
return code;
}
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
int32_t nRef = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *qFS = RSMA_FS(pStat);
int32_t size = taosArrayGetSize(qFS->aQTaskInf);
pFS->aQTaskInf = taosArrayInit_s(sizeof(SQTaskFile), size);
if (pFS->aQTaskInf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *qTaskF = (SQTaskFile *)taosArrayGet(qFS->aQTaskInf, i);
nRef = atomic_fetch_add_32(&qTaskF->nRef, 1);
if (nRef <= 0) {
code = TSDB_CODE_RSMA_FS_REF;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile));
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, nRef %d", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code),
nRef);
}
return code;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS) {
int32_t nRef = 0;
char fname[TSDB_FILENAME_LEN];
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1);
if (nRef == 0) {
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, pTaskF->version, pVnode->pTfs, fname);
if (taosRemoveFile(fname) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), fname, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), fname);
}
} else if (nRef < 0) {
smaWarn("vgId:%d, abnormal unref %s since %s", TD_VID(pVnode), fname, tstrerror(TSDB_CODE_RSMA_FS_REF));
}
}
taosArrayDestroy(pFS->aQTaskInf);
}
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
taosRLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSRef(pSma, pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *qFS = RSMA_FS(pStat);
int32_t size = taosArrayGetSize(qFS->aQTaskInf);
code = tdRSmaFSCreate(pFS, size);
TSDB_CHECK_CODE(code, lino, _exit);
taosArrayAddBatch(pFS->aQTaskInf, qFS->aQTaskInf->pData, size);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
...@@ -244,7 +244,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -244,7 +244,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
void *pStreamState = NULL; void *pStreamState = NULL;
// set the backend of stream state // set the backend of stream state
tdRSmaQTaskInfoGetFullPath(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir); tdRSmaQTaskInfoGetFullPath(pVnode, pRSmaInfo->suid, idx + 1, pVnode->pTfs, taskInfDir);
if (!taosCheckExistFile(taskInfDir)) { if (!taosCheckExistFile(taskInfDir)) {
char *s = taosStrdup(taskInfDir); char *s = taosStrdup(taskInfDir);
if (taosMulMkDir(s) != 0) { if (taosMulMkDir(s) != 0) {
......
...@@ -173,7 +173,6 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { ...@@ -173,7 +173,6 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
SSmaEnv* pEnv = NULL; SSmaEnv* pEnv = NULL;
SRSmaStat* pStat = NULL; SRSmaStat* pStat = NULL;
SRSmaSnapWriter* pWriter = *ppWriter; SRSmaSnapWriter* pWriter = *ppWriter;
const char* primaryPath = NULL;
char fname[TSDB_FILENAME_LEN] = {0}; char fname[TSDB_FILENAME_LEN] = {0};
char fnameVer[TSDB_FILENAME_LEN] = {0}; char fnameVer[TSDB_FILENAME_LEN] = {0};
TdFilePtr pOutFD = NULL; TdFilePtr pOutFD = NULL;
...@@ -187,7 +186,6 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { ...@@ -187,7 +186,6 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
pVnode = pSma->pVnode; pVnode = pSma->pVnode;
pEnv = SMA_RSMA_ENV(pSma); pEnv = SMA_RSMA_ENV(pSma);
pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv); pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
primaryPath = tfsGetPrimaryPath(pVnode->pTfs);
// rsma1/rsma2 // rsma1/rsma2
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
...@@ -241,4 +239,4 @@ _exit: ...@@ -241,4 +239,4 @@ _exit:
smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type); smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
} }
return code; return code;
} }
\ No newline at end of file
...@@ -14,31 +14,24 @@ ...@@ -14,31 +14,24 @@
*/ */
#include "sma.h" #include "sma.h"
#include "vnd.h"
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) { void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName) {
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); tdRSmaGetDirName(pVnode, pTfs, true, outputName);
int32_t rsmaLen = strlen(outputName); int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8 "%s%" PRIi64, level, TD_DIRSEP, suid); snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8 "%s%" PRIi64, level, TD_DIRSEP, suid);
} }
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) { void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputName) {
if (pdname) { int32_t offset = 0;
if (endWithSep) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, // vnode
dname, TD_DIRSEP); vnodeGetPrimaryDir(pVnode->path, pTfs, outputName, TSDB_FILENAME_LEN);
} else { offset = strlen(outputName);
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
dname); // rsma
} snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s", TD_DIRSEP, VNODE_RSMA_DIR,
} else { (endWithSep ? TD_DIRSEP : ""));
#if 0
if (endWithSep) {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname);
}
#endif
}
} }
// smaXXXUtil ================ // smaXXXUtil ================
...@@ -51,4 +44,4 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) { ...@@ -51,4 +44,4 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) {
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tsdb.h" #include "tsdb.h"
#include "vnd.h"
#define ROCKS_BATCH_SIZE (4096) #define ROCKS_BATCH_SIZE (4096)
...@@ -58,16 +59,10 @@ typedef struct { ...@@ -58,16 +59,10 @@ typedef struct {
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;
if (pVnode->pTfs) { vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, path, TSDB_FILENAME_LEN);
if (path) {
snprintf(path, TSDB_FILENAME_LEN, "%s%s%s%scache.rdb", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, int32_t offset = strlen(path);
pTsdb->path, TD_DIRSEP); snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);
}
} else {
if (path) {
snprintf(path, TSDB_FILENAME_LEN, "%s%scache.rdb", pTsdb->path, TD_DIRSEP);
}
}
} }
static const char *myCmpName(void *state) { static const char *myCmpName(void *state) {
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "tsdb.h" #include "tsdb.h"
#include "vnd.h"
// ================================================================================================= // =================================================================================================
static int32_t tsdbFSToBinary(uint8_t *p, STsdbFS *pFS) { static int32_t tsdbFSToBinary(uint8_t *p, STsdbFS *pFS) {
...@@ -271,22 +272,20 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) { ...@@ -271,22 +272,20 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) {
static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) { static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;
if (pVnode->pTfs) { int32_t offset = 0;
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, // CURRENT
pTsdb->path, TD_DIRSEP); if (current) {
} vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current, TSDB_FILENAME_LEN);
if (current_t) { offset = strlen(current);
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, snprintf(current + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT", TD_DIRSEP);
pTsdb->path, TD_DIRSEP); }
}
} else { // CURRENT.t
if (current) { if (current_t) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP); vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current_t, TSDB_FILENAME_LEN);
} offset = strlen(current_t);
if (current_t) { snprintf(current_t + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT.t", TD_DIRSEP);
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP);
}
} }
} }
...@@ -1142,4 +1141,4 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -1142,4 +1141,4 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
} }
taosArrayDestroy(pFS->aDFileSet); taosArrayDestroy(pFS->aDFileSet);
} }
\ No newline at end of file
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "tsdb.h" #include "tsdb.h"
#include "vnd.h"
int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile) { int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile) {
int32_t n = 0; int32_t n = 0;
...@@ -282,8 +283,12 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) { ...@@ -282,8 +283,12 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
// SDelFile =============================================== // SDelFile ===============================================
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) { void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%dver%" PRId64 "%s", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), int32_t offset = 0;
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), pFile->commitID, ".del");
vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN);
offset = strlen(fname);
snprintf((char *)fname + offset, TSDB_FILENAME_LEN - offset - 1, "%sv%dver%" PRId64 ".del", TD_DIRSEP,
TD_VID(pTsdb->pVnode), pFile->commitID);
} }
int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) { int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) {
......
...@@ -290,11 +290,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { ...@@ -290,11 +290,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
pInfo->txn = metaGetTxn(pVnode->pMeta); pInfo->txn = metaGetTxn(pVnode->pMeta);
// save info // save info
if (pVnode->pTfs) { vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode)); vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
if (vnodeSaveInfo(dir, &pInfo->info) < 0) { if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
...@@ -427,11 +423,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { ...@@ -427,11 +423,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
return -1; return -1;
} }
if (pVnode->pTfs) { vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed); syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
...@@ -493,16 +485,22 @@ _exit: ...@@ -493,16 +485,22 @@ _exit:
bool vnodeShouldRollback(SVnode *pVnode) { bool vnodeShouldRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0}; char tFName[TSDB_FILENAME_LEN] = {0};
snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, int32_t offset = 0;
VND_INFO_FNAME_TMP);
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
offset = strlen(tFName);
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
return taosCheckExistFile(tFName); return taosCheckExistFile(tFName);
} }
void vnodeRollback(SVnode *pVnode) { void vnodeRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0}; char tFName[TSDB_FILENAME_LEN] = {0};
snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, int32_t offset = 0;
VND_INFO_FNAME_TMP);
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
offset = strlen(tFName);
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
(void)taosRemoveFile(tFName); (void)taosRemoveFile(tFName);
} }
......
...@@ -15,6 +15,16 @@ ...@@ -15,6 +15,16 @@
#include "vnd.h" #include "vnd.h"
int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bufLen) {
if (pTfs) {
snprintf(buf, bufLen - 1, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, relPath);
} else {
snprintf(buf, bufLen - 1, "%s", relPath);
}
buf[bufLen - 1] = '\0';
return 0;
}
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
...@@ -26,17 +36,9 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -26,17 +36,9 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
} }
// create vnode env // create vnode env
if (pTfs) { vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
if (tfsMkdirAt(pTfs, path, (SDiskID){0}) < 0) { if (taosMkDir(dir)) {
vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(terrno)); return TAOS_SYSTEM_ERROR(errno);
return -1;
}
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
} else {
if (taosMkDir(path)) {
return TAOS_SYSTEM_ERROR(errno);
}
snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
} }
if (pCfg) { if (pCfg) {
...@@ -63,11 +65,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p ...@@ -63,11 +65,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0; int32_t ret = 0;
if (pTfs) { vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
}
ret = vnodeLoadInfo(dir, &info); ret = vnodeLoadInfo(dir, &info);
if (ret < 0) { if (ret < 0) {
...@@ -185,21 +183,12 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr ...@@ -185,21 +183,12 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
return ret; return ret;
} }
int32_t vnodeGetAbsDir(const char *relPath, STfs *pTfs, char *buf, size_t bufLen) {
if (pTfs) {
snprintf(buf, bufLen, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, relPath);
} else {
snprintf(buf, bufLen, "%s", relPath);
}
return 0;
}
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) { int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0; int32_t ret = 0;
vnodeGetAbsDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
ret = vnodeLoadInfo(dir, &info); ret = vnodeLoadInfo(dir, &info);
if (ret < 0) { if (ret < 0) {
...@@ -258,7 +247,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s ...@@ -258,7 +247,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetAbsDir(dstPath, pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(dstPath, pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &info) == 0) { if (vnodeLoadInfo(dir, &info) == 0) {
if (info.config.vgId != dstVgId) { if (info.config.vgId != dstVgId) {
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId); vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
...@@ -267,7 +256,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s ...@@ -267,7 +256,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
return dstVgId; return dstVgId;
} }
vnodeGetAbsDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &info) < 0) { if (vnodeLoadInfo(dir, &info) < 0) {
vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno)); vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
return -1; return -1;
...@@ -302,11 +291,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -302,11 +291,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
char tdir[TSDB_FILENAME_LEN * 2] = {0}; char tdir[TSDB_FILENAME_LEN * 2] = {0};
int32_t ret = 0; int32_t ret = 0;
if (pTfs) { vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
}
info.config = vnodeCfgDefault; info.config = vnodeCfgDefault;
......
...@@ -35,11 +35,7 @@ static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) { ...@@ -35,11 +35,7 @@ static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) {
pInfo->commitID = ++pVnode->state.commitID; pInfo->commitID = ++pVnode->state.commitID;
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) { vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
if (vnodeLoadInfo(dir, &pInfo->info) < 0) { if (vnodeLoadInfo(dir, &pInfo->info) < 0) {
code = terrno; code = terrno;
...@@ -64,11 +60,7 @@ static int32_t vnodeRetentionTask(void *param) { ...@@ -64,11 +60,7 @@ static int32_t vnodeRetentionTask(void *param) {
SVnode *pVnode = pInfo->pVnode; SVnode *pVnode = pInfo->pVnode;
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) { vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
// save info // save info
pInfo->info.state.commitID = pInfo->commitID; pInfo->info.state.commitID = pInfo->commitID;
...@@ -130,4 +122,4 @@ _exit: ...@@ -130,4 +122,4 @@ _exit:
vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__); vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
} }
return 0; return 0;
} }
\ No newline at end of file
...@@ -91,12 +91,11 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) ...@@ -91,12 +91,11 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
// FIXME: if commit multiple times and the config changed? // FIXME: if commit multiple times and the config changed?
if (!pReader->cfgDone) { if (!pReader->cfgDone) {
char fName[TSDB_FILENAME_LEN]; char fName[TSDB_FILENAME_LEN];
if (pReader->pVnode->pTfs) { int32_t offset = 0;
snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pReader->pVnode->pTfs), TD_DIRSEP,
pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME); vnodeGetPrimaryDir(pReader->pVnode->path, pReader->pVnode->pTfs, fName, TSDB_FILENAME_LEN);
} else { offset = strlen(fName);
snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s", pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME); snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
}
TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ);
if (NULL == pFile) { if (NULL == pFile) {
...@@ -344,11 +343,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * ...@@ -344,11 +343,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
.applyTerm = pWriter->info.state.commitTerm}; .applyTerm = pWriter->info.state.commitTerm};
pVnode->statis = pWriter->info.statis; pVnode->statis = pWriter->info.statis;
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
if (pWriter->pVnode->pTfs) { vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
}
vnodeCommitInfo(dir); vnodeCommitInfo(dir);
} else { } else {
...@@ -400,12 +395,7 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_ ...@@ -400,12 +395,7 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_
// modify info as needed // modify info as needed
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
if (pWriter->pVnode->pTfs) { vnodeGetPrimaryDir(pWriter->pVnode->path, pWriter->pVnode->pTfs, dir, TSDB_FILENAME_LEN);
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pWriter->pVnode->pTfs), TD_DIRSEP,
pWriter->pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
}
SVnodeStats vndStats = pWriter->info.config.vndStats; SVnodeStats vndStats = pWriter->info.config.vndStats;
SVnode *pVnode = pWriter->pVnode; SVnode *pVnode = pWriter->pVnode;
......
...@@ -447,6 +447,7 @@ cmd ::= SHOW MNODES. ...@@ -447,6 +447,7 @@ cmd ::= SHOW MNODES.
cmd ::= SHOW QNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QNODES_STMT); } cmd ::= SHOW QNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QNODES_STMT); }
cmd ::= SHOW FUNCTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_FUNCTIONS_STMT); } cmd ::= SHOW FUNCTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_FUNCTIONS_STMT); }
cmd ::= SHOW INDEXES FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_INDEXES_STMT, B, A, OP_TYPE_EQUAL); } cmd ::= SHOW INDEXES FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_INDEXES_STMT, B, A, OP_TYPE_EQUAL); }
cmd ::= SHOW INDEXES FROM db_name(B) NK_DOT table_name(A). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_INDEXES_STMT, createIdentifierValueNode(pCxt, &B), createIdentifierValueNode(pCxt, &A), OP_TYPE_EQUAL); }
cmd ::= SHOW STREAMS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_STREAMS_STMT); } cmd ::= SHOW STREAMS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_STREAMS_STMT); }
cmd ::= SHOW ACCOUNTS. { pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_EXPRIE_STATEMENT); } cmd ::= SHOW ACCOUNTS. { pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_EXPRIE_STATEMENT); }
cmd ::= SHOW APPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_APPS_STMT); } cmd ::= SHOW APPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_APPS_STMT); }
...@@ -471,7 +472,9 @@ cmd ::= SHOW TABLE DISTRIBUTED full_table_name(A). ...@@ -471,7 +472,9 @@ cmd ::= SHOW TABLE DISTRIBUTED full_table_name(A).
cmd ::= SHOW CONSUMERS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CONSUMERS_STMT); } cmd ::= SHOW CONSUMERS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CONSUMERS_STMT); }
cmd ::= SHOW SUBSCRIPTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT); } cmd ::= SHOW SUBSCRIPTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT); }
cmd ::= SHOW TAGS FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TAGS_STMT, B, A, OP_TYPE_EQUAL); } cmd ::= SHOW TAGS FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TAGS_STMT, B, A, OP_TYPE_EQUAL); }
cmd ::= SHOW TAGS FROM db_name(B) NK_DOT table_name(A). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TAGS_STMT, createIdentifierValueNode(pCxt, &B), createIdentifierValueNode(pCxt, &A), OP_TYPE_EQUAL); }
cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, A, B, C); } cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, A, B, C); }
cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM db_name(B) NK_DOT table_name(A). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, createIdentifierValueNode(pCxt, &A), createIdentifierValueNode(pCxt, &B), C); }
cmd ::= SHOW VNODES NK_INTEGER(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A), NULL); } cmd ::= SHOW VNODES NK_INTEGER(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A), NULL); }
cmd ::= SHOW VNODES NK_STRING(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, NULL, createValueNode(pCxt, TSDB_DATA_TYPE_VARCHAR, &A)); } cmd ::= SHOW VNODES NK_STRING(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, NULL, createValueNode(pCxt, TSDB_DATA_TYPE_VARCHAR, &A)); }
// show alive // show alive
......
此差异已折叠。
...@@ -207,6 +207,7 @@ ...@@ -207,6 +207,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionUS.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionUS.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionNS.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionNS.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/information_schema.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/information_schema.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py -R
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import re
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def check_tags(self):
tdSql.checkRows(2)
tdSql.checkCols(6)
tdSql.checkData(0, 0, 'ctb1')
tdSql.checkData(0, 1, 'db')
tdSql.checkData(0, 2, 'stb')
tdSql.checkData(0, 3, 't0')
tdSql.checkData(0, 4, 'INT')
tdSql.checkData(0, 5, 1)
tdSql.checkData(1, 0, 'ctb1')
tdSql.checkData(1, 1, 'db')
tdSql.checkData(1, 2, 'stb')
tdSql.checkData(1, 3, 't1')
tdSql.checkData(1, 4, 'INT')
tdSql.checkData(1, 5, 1)
def check_table_tags(self, is_super_table):
if is_super_table == False:
tdSql.checkRows(1)
tdSql.checkCols(3)
tdSql.checkData(0, 0, 'ctb1')
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, 1)
else:
tdSql.checkRows(2)
tdSql.checkCols(3)
tdSql.checkData(0, 0, 'ctb1')
tdSql.checkData(1, 0, 'ctb2')
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 2)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 2)
def check_indexes(self):
tdSql.checkRows(1)
tdSql.checkCols(7)
tdSql.checkData(0, 0, 'idx1')
tdSql.checkData(0, 1, 'db')
tdSql.checkData(0, 2, 'stb')
tdSql.checkData(0, 3, -1)
tdSql.checkData(0, 5, 't1')
tdSql.checkData(0, 6, 'tag_index')
def run(self):
tdSql.execute(f'create database db')
tdSql.execute(f'use db')
tdSql.execute(f'create table stb (ts timestamp, c0 int) tags (t0 int, t1 int)')
tdSql.execute(f'create table ctb1 using stb tags (1, 1)')
tdSql.execute(f'create table ctb2 using stb tags (2, 2)')
tdSql.execute(f'insert into ctb1 values (now, 1)')
tdSql.execute(f'insert into ctb2 values (now, 2)')
# show tags
tdSql.query(f'show tags from stb')
tdSql.checkRows(0)
tdSql.query(f'show tags from stb')
tdSql.checkRows(0);
tdSql.query(f'show tags from `stb`')
tdSql.checkRows(0);
tdSql.query(f'show tags from stb from db')
tdSql.checkRows(0);
tdSql.query(f'show tags from `stb` from `db`')
tdSql.checkRows(0);
tdSql.query(f'show tags from db.stb')
tdSql.checkRows(0);
tdSql.query(f'show tags from `db`.`stb`')
tdSql.checkRows(0);
tdSql.query(f'show tags from ctb1')
self.check_tags();
tdSql.query(f'show tags from `ctb1`')
self.check_tags();
tdSql.query(f'show tags from ctb1 from db')
self.check_tags();
tdSql.query(f'show tags from `ctb1` from `db`')
self.check_tags();
tdSql.query(f'show tags from db.ctb1')
self.check_tags();
tdSql.query(f'show tags from `db`.`ctb1`')
self.check_tags();
tdSql.error(f'show tags from db.stb from db')
tdSql.error(f'show tags from `db`.`stb` from db')
tdSql.error(f'show tags from db.ctb1 from db')
tdSql.error(f'show tags from `db`.`ctb1` from db')
# show table tags
tdSql.query(f'show table tags from stb')
self.check_table_tags(True);
tdSql.query(f'show table tags from `stb`')
self.check_table_tags(True);
tdSql.query(f'show table tags from stb from db')
self.check_table_tags(True);
tdSql.query(f'show table tags from `stb` from `db`')
self.check_table_tags(True);
tdSql.query(f'show table tags from db.stb')
self.check_table_tags(True);
tdSql.query(f'show table tags from `db`.`stb`')
self.check_table_tags(True);
tdSql.query(f'show table tags from ctb1')
self.check_table_tags(False);
tdSql.query(f'show table tags from `ctb1`')
self.check_table_tags(False);
tdSql.query(f'show table tags from ctb1 from db')
self.check_table_tags(False);
tdSql.query(f'show table tags from `ctb1` from `db`')
self.check_table_tags(False);
tdSql.query(f'show table tags from db.ctb1')
self.check_table_tags(False);
tdSql.query(f'show table tags from `db`.`ctb1`')
self.check_table_tags(False);
tdSql.error(f'show table tags from db.stb from db')
tdSql.error(f'show table tags from `db`.`stb` from db')
tdSql.error(f'show table tags from db.ctb1 from db')
tdSql.error(f'show table tags from `db`.`ctb1` from db')
# show indexes
tdSql.execute(f'create index idx1 on stb (t1)')
tdSql.query(f'show indexes from stb')
self.check_indexes();
tdSql.query(f'show indexes from `stb`')
self.check_indexes();
tdSql.query(f'show indexes from stb from db')
self.check_indexes();
tdSql.query(f'show indexes from `stb` from `db`')
self.check_indexes();
tdSql.query(f'show indexes from db.stb')
self.check_indexes();
tdSql.query(f'show indexes from `db`.`stb`')
self.check_indexes();
tdSql.query(f'show indexes from ctb1')
tdSql.checkRows(0)
tdSql.query(f'show indexes from `ctb1`')
tdSql.checkRows(0)
tdSql.query(f'show indexes from ctb1 from db')
tdSql.checkRows(0)
tdSql.query(f'show indexes from `ctb1` from `db`')
tdSql.checkRows(0)
tdSql.query(f'show indexes from db.ctb1')
tdSql.checkRows(0)
tdSql.query(f'show indexes from `db`.`ctb1`')
tdSql.checkRows(0)
tdSql.error(f'show indexes from db.stb from db')
tdSql.error(f'show indexes from `db`.`stb` from db')
tdSql.error(f'show indexes from db.ctb1 from db')
tdSql.error(f'show indexes from `db`.`ctb1` from db')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册