提交 4f1ec74e 编写于 作者: haoranc's avatar haoranc

Merge branch '3.0' of github.com:taosdata/TDengine into test/chr/TD-14699

...@@ -146,5 +146,6 @@ option( ...@@ -146,5 +146,6 @@ option(
option( option(
BUILD_WITH_INVERTEDINDEX BUILD_WITH_INVERTEDINDEX
"If use invertedIndex" "If use invertedIndex"
ON OFF
) )
...@@ -33,7 +33,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速 ...@@ -33,7 +33,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
关键不同点在于: 关键不同点在于:
1. 使用 REST 连接,用户无需安装客户端驱动程序 taosc,具有跨平台易用的优势,但性能要下降 30%左右。 1. 使用 REST 连接,用户无需安装客户端驱动程序 taosc,具有跨平台易用的优势,但性能要下降 30%左右。
2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](/reference/connector/cpp#参数绑定-api)[订阅](reference/connector/cpp#数据订阅接口)等等。 2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](/reference/connector/cpp#参数绑定-api)[订阅](/reference/connector/cpp#订阅和消费-api)等等。
## 安装客户端驱动 taosc ## 安装客户端驱动 taosc
......
...@@ -20,21 +20,21 @@ CREATE DATABASE [IF NOT EXISTS] db_name [KEEP keep] [DAYS days] [UPDATE 1]; ...@@ -20,21 +20,21 @@ CREATE DATABASE [IF NOT EXISTS] db_name [KEEP keep] [DAYS days] [UPDATE 1];
3. 数据库名最大长度为 33; 3. 数据库名最大长度为 33;
4. 一条 SQL 语句的最大长度为 65480 个字符; 4. 一条 SQL 语句的最大长度为 65480 个字符;
5. 创建数据库时可用的参数有: 5. 创建数据库时可用的参数有:
- cache: [Description](/reference/config/#cache) - cache: [详细说明](/reference/config/#cache)
- blocks: [Description](/reference/config/#blocks) - blocks: [详细说明](/reference/config/#blocks)
- days: [Description](/reference/config/#days) - days: [详细说明](/reference/config/#days)
- keep: [Description](/reference/config/#keep) - keep: [详细说明](/reference/config/#keep)
- minRows: [Description](/reference/config/#minrows) - minRows: [详细说明](/reference/config/#minrows)
- maxRows: [Description](/reference/config/#maxrows) - maxRows: [详细说明](/reference/config/#maxrows)
- wal: [Description](/reference/config/#wallevel) - wal: [详细说明](/reference/config/#wallevel)
- fsync: [Description](/reference/config/#fsync) - fsync: [详细说明](/reference/config/#fsync)
- update: [Description](/reference/config/#update) - update: [详细说明](/reference/config/#update)
- cacheLast: [Description](/reference/config/#cachelast) - cacheLast: [详细说明](/reference/config/#cachelast)
- replica: [Description](/reference/config/#replica) - replica: [详细说明](/reference/config/#replica)
- quorum: [Description](/reference/config/#quorum) - quorum: [详细说明](/reference/config/#quorum)
- maxVgroupsPerDb: [Description](/reference/config/#maxvgroupsperdb) - maxVgroupsPerDb: [详细说明](/reference/config/#maxvgroupsperdb)
- comp: [Description](/reference/config/#comp) - comp: [详细说明](/reference/config/#comp)
- precision: [Description](/reference/config/#precision) - precision: [详细说明](/reference/config/#precision)
6. 请注意上面列出的所有参数都可以配置在配置文件 `taosd.cfg` 中作为创建数据库时使用的默认配置, `create database` 的参数中明确指定的会覆盖配置文件中的设置。 6. 请注意上面列出的所有参数都可以配置在配置文件 `taosd.cfg` 中作为创建数据库时使用的默认配置, `create database` 的参数中明确指定的会覆盖配置文件中的设置。
::: :::
......
...@@ -33,7 +33,7 @@ Either way, same or similar APIs are provided by connectors to access database o ...@@ -33,7 +33,7 @@ Either way, same or similar APIs are provided by connectors to access database o
Key differences: Key differences:
1. With REST connection, it's not necessary to install TDengine client driver (taosc), it's more friendly for cross-platform with the cost of 30% performance downgrade. When taosc has an upgrade, application does not need to make changes. 1. With REST connection, it's not necessary to install TDengine client driver (taosc), it's more friendly for cross-platform with the cost of 30% performance downgrade. When taosc has an upgrade, application does not need to make changes.
2. With native connection, full compatibility of TDengine can be utilized, like [Parameter Binding](/reference/connector/cpp#Parameter Binding-api), [Subscription](reference/connector/cpp#Subscription), etc. But taosc has to be installed, some platforms may not be supported. 2. With native connection, full compatibility of TDengine can be utilized, like [Parameter Binding](/reference/connector/cpp#parameter-binding-api), [Subscription](/reference/connector/cpp#subscription-and-consumption-api), etc. But taosc has to be installed, some platforms may not be supported.
## Install Client Driver taosc ## Install Client Driver taosc
......
...@@ -34,7 +34,7 @@ CREATE DATABASE [IF NOT EXISTS] db_name [KEEP keep] [DAYS days] [UPDATE 1]; ...@@ -34,7 +34,7 @@ CREATE DATABASE [IF NOT EXISTS] db_name [KEEP keep] [DAYS days] [UPDATE 1];
- quorum: [Description](/reference/config/#quorum) - quorum: [Description](/reference/config/#quorum)
- maxVgroupsPerDb: [Description](/reference/config/#maxvgroupsperdb) - maxVgroupsPerDb: [Description](/reference/config/#maxvgroupsperdb)
- comp: [Description](/reference/config/#comp) - comp: [Description](/reference/config/#comp)
- precision: [Description](reference/config/#precision) - precision: [Description](/reference/config/#precision)
6. Please be noted that all of the parameters mentioned in this section can be configured in configuration file `taosd.cfg` at server side and used by default, can be override if they are specified in `create database` statement. 6. Please be noted that all of the parameters mentioned in this section can be configured in configuration file `taosd.cfg` at server side and used by default, can be override if they are specified in `create database` statement.
::: :::
......
...@@ -78,7 +78,7 @@ Manually install the following tools. ...@@ -78,7 +78,7 @@ Manually install the following tools.
- Install [Python](https://www.python.org/downloads/) 2.7 (`v3.x.x` is not supported) and execute `npm config set python python2.7`. - Install [Python](https://www.python.org/downloads/) 2.7 (`v3.x.x` is not supported) and execute `npm config set python python2.7`.
- Go to the `cmd` command-line interface, `npm config set msvs_version 2017` - Go to the `cmd` command-line interface, `npm config set msvs_version 2017`
Refer to Microsoft's Node.js User Manual [Microsoft's Node.js Guidelines for Windows](https://github.com/Microsoft/nodejs-guidelines/blob/master/windows- environment. md#compiling-native-addon-modules). Refer to Microsoft's Node.js User Manual [Microsoft's Node.js Guidelines for Windows](https://github.com/Microsoft/nodejs-guidelines/blob/master/windows-environment.md#compiling-native-addon-modules).
If using ARM64 Node.js on Windows 10 ARM, you must add "Visual C++ compilers and libraries for ARM64" and "Visual C++ ATL for ARM64". If using ARM64 Node.js on Windows 10 ARM, you must add "Visual C++ compilers and libraries for ARM64" and "Visual C++ ATL for ARM64".
......
...@@ -32,7 +32,7 @@ We will explain how to migrate OpenTSDB applications to TDengine quickly, secure ...@@ -32,7 +32,7 @@ We will explain how to migrate OpenTSDB applications to TDengine quickly, secure
The following figure (Figure 1) shows the system's overall architecture for a typical DevOps application scenario. The following figure (Figure 1) shows the system's overall architecture for a typical DevOps application scenario.
**Figure 1. Typical architecture in a DevOps scenario** **Figure 1. Typical architecture in a DevOps scenario**
Figure 1. [IT-DevOps-Solutions-Immigrate-OpenTSDB-Arch](/img/IT-DevOps-Solutions-Immigrate-OpenTSDB-Arch.jpg "Figure 1. Typical architecture in a DevOps scenario") ![IT-DevOps-Solutions-Immigrate-OpenTSDB-Arch](/img/IT-DevOps-Solutions-Immigrate-OpenTSDB-Arch.jpg "Figure 1. Typical architecture in a DevOps scenario")
In this application scenario, there are Agent tools deployed in the application environment to collect machine metrics, network metrics, and application metrics. Data collectors to aggregate information collected by agents, systems for persistent data storage and management, and tools for monitoring data visualization (e.g., Grafana, etc.). In this application scenario, there are Agent tools deployed in the application environment to collect machine metrics, network metrics, and application metrics. Data collectors to aggregate information collected by agents, systems for persistent data storage and management, and tools for monitoring data visualization (e.g., Grafana, etc.).
...@@ -75,7 +75,7 @@ After writing the data to TDengine properly, you can adapt Grafana to visualize ...@@ -75,7 +75,7 @@ After writing the data to TDengine properly, you can adapt Grafana to visualize
TDengine provides two sets of Dashboard templates by default, and users only need to import the templates from the Grafana directory into Grafana to activate their use. TDengine provides two sets of Dashboard templates by default, and users only need to import the templates from the Grafana directory into Grafana to activate their use.
**Importing Grafana Templates** Figure 2. **Importing Grafana Templates** Figure 2.
! [](/img/IT-DevOps-Solutions-Immigrate-OpenTSDB-Dashboard.jpg "Figure 2. Importing a Grafana Template") ![](/img/IT-DevOps-Solutions-Immigrate-OpenTSDB-Dashboard.jpg "Figure 2. Importing a Grafana Template")
After the above steps, you completed the migration to replace OpenTSDB with TDengine. You can see that the whole process is straightforward, there is no need to write any code, and only some configuration files need to be adjusted to meet the migration work. After the above steps, you completed the migration to replace OpenTSDB with TDengine. You can see that the whole process is straightforward, there is no need to write any code, and only some configuration files need to be adjusted to meet the migration work.
...@@ -88,7 +88,7 @@ In most DevOps scenarios, if you have a small OpenTSDB cluster (3 or fewer nodes ...@@ -88,7 +88,7 @@ In most DevOps scenarios, if you have a small OpenTSDB cluster (3 or fewer nodes
Suppose your application is particularly complex, or the application domain is not a DevOps scenario. You can continue reading subsequent chapters for a more comprehensive and in-depth look at the advanced topics of migrating an OpenTSDB application to TDengine. Suppose your application is particularly complex, or the application domain is not a DevOps scenario. You can continue reading subsequent chapters for a more comprehensive and in-depth look at the advanced topics of migrating an OpenTSDB application to TDengine.
**Figure 3. System architecture after migration** **Figure 3. System architecture after migration**
! [IT-DevOps-Solutions-Immigrate-TDengine-Arch](/img/IT-DevOps-Solutions-Immigrate-TDengine-Arch.jpg "Figure 3. System architecture after migration completion") ![IT-DevOps-Solutions-Immigrate-TDengine-Arch](/img/IT-DevOps-Solutions-Immigrate-TDengine-Arch.jpg "Figure 3. System architecture after migration completion")
## Migration evaluation and strategy for other scenarios ## Migration evaluation and strategy for other scenarios
...@@ -96,7 +96,7 @@ Suppose your application is particularly complex, or the application domain is n ...@@ -96,7 +96,7 @@ Suppose your application is particularly complex, or the application domain is n
This chapter describes the differences between OpenTSDB and TDengine at the system functionality level. After reading this chapter, you can fully evaluate whether you can migrate some complex OpenTSDB-based applications to TDengine, and what you should pay attention to after migration. This chapter describes the differences between OpenTSDB and TDengine at the system functionality level. After reading this chapter, you can fully evaluate whether you can migrate some complex OpenTSDB-based applications to TDengine, and what you should pay attention to after migration.
TDengine currently only supports Grafana for visual kanban rendering, so if your application uses front-end kanban boards other than Grafana (e.g., [TSDash](https://github.com/facebook/tsdash), [Status Wolf](https://github) .com/box/StatusWolf), etc.). You cannot directly migrate those front-end kanbans to TDengine, and the front-end kanban will need to be ported to Grafana to work correctly. TDengine currently only supports Grafana for visual kanban rendering, so if your application uses front-end kanban boards other than Grafana (e.g., [TSDash](https://github.com/facebook/tsdash), [Status Wolf](https://github.com/box/StatusWolf), etc.). You cannot directly migrate those front-end kanbans to TDengine, and the front-end kanban will need to be ported to Grafana to work correctly.
TDengine version 2.3.0.x only supports collectd and StatsD as data collection aggregation software but will provide more data collection aggregation software in the future. If you use other data aggregators on the collection side, your application needs to be ported to these two data aggregation systems to write data correctly. TDengine version 2.3.0.x only supports collectd and StatsD as data collection aggregation software but will provide more data collection aggregation software in the future. If you use other data aggregators on the collection side, your application needs to be ported to these two data aggregation systems to write data correctly.
In addition to the two data aggregator software protocols mentioned above, TDengine also supports writing data directly via InfluxDB's line protocol and OpenTSDB's data writing protocol, JSON format. You can rewrite the logic on the data push side to write data using the line protocols supported by TDengine. In addition to the two data aggregator software protocols mentioned above, TDengine also supports writing data directly via InfluxDB's line protocol and OpenTSDB's data writing protocol, JSON format. You can rewrite the logic on the data push side to write data using the line protocols supported by TDengine.
......
...@@ -86,6 +86,11 @@ typedef enum { ...@@ -86,6 +86,11 @@ typedef enum {
TSDB_RETENTION_MAX = 3 TSDB_RETENTION_MAX = 3
} ERetentionLevel; } ERetentionLevel;
typedef enum {
TSDB_BITMODE_DEFAULT = 0, // 2 bits
TSDB_BITMODE_ONE_BIT = 1, // 1 bit
} EBitmapMode;
extern char *qtypeStr[]; extern char *qtypeStr[];
#define TSDB_PORT_HTTP 11 #define TSDB_PORT_HTTP 11
......
...@@ -313,8 +313,9 @@ typedef struct { ...@@ -313,8 +313,9 @@ typedef struct {
SDataCol *cols; SDataCol *cols;
} SDataCols; } SDataCols;
static FORCE_INLINE bool tdDataColsIsBitmapI(SDataCols *pCols) { return pCols->bitmapMode != 0; } static FORCE_INLINE bool tdDataColsIsBitmapI(SDataCols *pCols) { return pCols->bitmapMode != TSDB_BITMODE_DEFAULT; }
static FORCE_INLINE void tdDataColsSetBitmapI(SDataCols *pCols) { pCols->bitmapMode = 1; } static FORCE_INLINE void tdDataColsSetBitmapI(SDataCols *pCols) { pCols->bitmapMode = TSDB_BITMODE_ONE_BIT; }
static FORCE_INLINE bool tdIsBitmapModeI(int8_t bitmapMode) { return bitmapMode != TSDB_BITMODE_DEFAULT; }
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column #define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] // the idx row of column-wised data #define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] // the idx row of column-wised data
......
...@@ -46,24 +46,34 @@ typedef enum { ...@@ -46,24 +46,34 @@ typedef enum {
AUTH_TYPE_OTHER, AUTH_TYPE_OTHER,
} AUTH_TYPE; } AUTH_TYPE;
typedef struct SUserAuthInfo {
char user[TSDB_USER_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
AUTH_TYPE type;
} SUserAuthInfo;
typedef struct SCatalogReq { typedef struct SCatalogReq {
SArray *pTableName; // element is SNAME SArray *pTableMeta; // element is SNAME
SArray *pUdf; // udf name SArray *pDbVgroup; // element is db full name
SArray *pTableHash; // element is SNAME
SArray *pUdf; // element is udf name
SArray *pDbCfg; // element is db full name
SArray *pIndex; // element is index name
SArray *pUser; // element is SUserAuthInfo
bool qNodeRequired; // valid qnode bool qNodeRequired; // valid qnode
} SCatalogReq; } SCatalogReq;
typedef struct SMetaData { typedef struct SMetaData {
SArray *pTableMeta; // STableMeta array SArray *pTableMeta; // SArray<STableMeta>
SArray *pVgroupInfo; // SVgroupInfo list SArray *pDbVgroup; // SArray<SArray<SVgroupInfo>*>
SArray *pUdfList; // udf info list SArray *pTableHash; // SArray<SVgroupInfo>
SArray *pQnodeList; // qnode list, SArray<SQueryNodeAddr> SArray *pUdfList; // SArray<SFuncInfo>
SArray *pDbCfg; // SArray<SDbCfgInfo>
SArray *pIndex; // SArray<SIndexInfo>
SArray *pUser; // SArray<bool>
SArray *pQnodeList; // SArray<SQueryNodeAddr>
} SMetaData; } SMetaData;
typedef struct STbSVersion {
char* tbFName;
int32_t sver;
} STbSVersion;
typedef struct SCatalogCfg { typedef struct SCatalogCfg {
uint32_t maxTblCacheNum; uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum; uint32_t maxDBCacheNum;
...@@ -88,6 +98,11 @@ typedef struct SDbVgVersion { ...@@ -88,6 +98,11 @@ typedef struct SDbVgVersion {
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SDbVgVersion; } SDbVgVersion;
typedef struct STbSVersion {
char* tbFName;
int32_t sver;
} STbSVersion;
typedef struct SUserAuthVersion { typedef struct SUserAuthVersion {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
int32_t version; int32_t version;
...@@ -96,6 +111,8 @@ typedef struct SUserAuthVersion { ...@@ -96,6 +111,8 @@ typedef struct SUserAuthVersion {
typedef SDbCfgRsp SDbCfgInfo; typedef SDbCfgRsp SDbCfgInfo;
typedef SUserIndexRsp SIndexInfo; typedef SUserIndexRsp SIndexInfo;
typedef void (*catalogCallback)(SMetaData* pResult, void* param, int32_t code);
int32_t catalogInit(SCatalogCfg *cfg); int32_t catalogInit(SCatalogCfg *cfg);
/** /**
...@@ -131,7 +148,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t d ...@@ -131,7 +148,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t d
int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId); int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId);
int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName); int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName);
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid); int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid);
...@@ -241,9 +258,9 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_ ...@@ -241,9 +258,9 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg); int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo); int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass); int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
......
...@@ -31,6 +31,12 @@ ...@@ -31,6 +31,12 @@
extern "C" { extern "C" {
#endif #endif
#ifdef WINDOWS
#define TD_TMP_DIR_PATH "C:\\Windows\\Temp\\"
#else
#define TD_TMP_DIR_PATH "/tmp/"
#endif
typedef struct TdDir *TdDirPtr; typedef struct TdDir *TdDirPtr;
typedef struct TdDirEntry *TdDirEntryPtr; typedef struct TdDirEntry *TdDirEntryPtr;
......
...@@ -419,6 +419,7 @@ int32_t* taosGetErrno(); ...@@ -419,6 +419,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09) #define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09)
#define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A) #define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A)
#define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B) #define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B)
#define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0B)
// wal // wal
#define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000)
...@@ -636,6 +637,7 @@ int32_t* taosGetErrno(); ...@@ -636,6 +637,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_COMMENT_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x264E) #define TSDB_CODE_PAR_COMMENT_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x264E)
#define TSDB_CODE_PAR_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x264F) #define TSDB_CODE_PAR_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x264F)
#define TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY TAOS_DEF_ERROR_CODE(0, 0x2650) #define TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY TAOS_DEF_ERROR_CODE(0, 0x2650)
#define TSDB_CODE_PAR_INVALID_DROP_COL TAOS_DEF_ERROR_CODE(0, 0x2651)
//planner //planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
...@@ -121,7 +121,7 @@ struct SAppInstInfo { ...@@ -121,7 +121,7 @@ struct SAppInstInfo {
SCorEpSet mgmtEp; SCorEpSet mgmtEp;
SInstanceSummary summary; SInstanceSummary summary;
SList* pConnList; // STscObj linked list SList* pConnList; // STscObj linked list
int64_t clusterId; uint64_t clusterId;
void* pTransporter; void* pTransporter;
SAppHbMgr* pAppHbMgr; SAppHbMgr* pAppHbMgr;
}; };
...@@ -286,6 +286,8 @@ void initMsgHandleFp(); ...@@ -286,6 +286,8 @@ void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType); uint16_t port, int connType);
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb); int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
......
...@@ -310,6 +310,8 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { ...@@ -310,6 +310,8 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
taosArrayDestroy(desc.subDesc); taosArrayDestroy(desc.subDesc);
desc.subDesc = NULL; desc.subDesc = NULL;
} }
} else {
desc.subDesc = NULL;
} }
releaseRequest(*rid); releaseRequest(*rid);
......
...@@ -565,10 +565,32 @@ const char *taos_get_server_info(TAOS *taos) { ...@@ -565,10 +565,32 @@ const char *taos_get_server_info(TAOS *taos) {
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
if (taos == NULL || sql == NULL) { if (taos == NULL || sql == NULL) {
// todo directly call fp fp(param, NULL, TSDB_CODE_INVALID_PARA);
return;
}
SRequestObj* pRequest = NULL;
int32_t retryNum = 0;
int32_t code = 0;
size_t sqlLen = strlen(sql);
while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
pRequest = launchQuery(taos, sql, sqlLen);
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
}
code = refreshMeta(taos, pRequest);
if (code) {
pRequest->code = code;
break;
}
destroyRequest(pRequest);
} }
taos_query_l(taos, sql, (int32_t)strlen(sql)); fp(param, pRequest, code);
} }
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
......
...@@ -125,10 +125,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -125,10 +125,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
if (usedbRsp.vgVersion >= 0) { if (usedbRsp.vgVersion >= 0) {
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
if (code1 != TSDB_CODE_SUCCESS) { if (code1 != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tstrerror(code1));
tstrerror(code1));
} else { } else {
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
} }
...@@ -158,7 +158,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -158,7 +158,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
taosMemoryFreeClear(output.dbVgroup); taosMemoryFreeClear(output.dbVgroup);
tscError("failed to build use db output since %s", terrstr()); tscError("0x%" PRIx64" failed to build use db output since %s", pRequest->requestId, terrstr());
} else if (output.dbVgroup) { } else if (output.dbVgroup) {
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
......
...@@ -855,7 +855,7 @@ SDataCols *tdNewDataCols(int maxCols, int maxRows) { ...@@ -855,7 +855,7 @@ SDataCols *tdNewDataCols(int maxCols, int maxRows) {
pCols->maxCols = maxCols; pCols->maxCols = maxCols;
pCols->numOfRows = 0; pCols->numOfRows = 0;
pCols->numOfCols = 0; pCols->numOfCols = 0;
// pCols->bitmapMode = 0; // calloc already set 0 pCols->bitmapMode = TSDB_BITMODE_DEFAULT;
if (maxCols > 0) { if (maxCols > 0) {
pCols->cols = (SDataCol *)taosMemoryCalloc(maxCols, sizeof(SDataCol)); pCols->cols = (SDataCol *)taosMemoryCalloc(maxCols, sizeof(SDataCol));
...@@ -899,7 +899,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { ...@@ -899,7 +899,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
#endif #endif
pCols->numOfRows = 0; pCols->numOfRows = 0;
pCols->bitmapMode = 0; pCols->bitmapMode = TSDB_BITMODE_DEFAULT;
pCols->numOfCols = schemaNCols(pSchema); pCols->numOfCols = schemaNCols(pSchema);
for (i = 0; i < schemaNCols(pSchema); ++i) { for (i = 0; i < schemaNCols(pSchema); ++i) {
......
...@@ -341,18 +341,19 @@ int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDRowValT valType, int8 ...@@ -341,18 +341,19 @@ int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDRowValT valType, int8
bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode) { bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode) {
int32_t nBytes = (bitmapMode == 0 ? numOfBits / TD_VTYPE_PARTS : numOfBits / TD_VTYPE_PARTS_I); int32_t nBytes = (bitmapMode == 0 ? numOfBits / TD_VTYPE_PARTS : numOfBits / TD_VTYPE_PARTS_I);
uint8_t vTypeByte = tdVTypeByte[bitmapMode][TD_VTYPE_NORM]; uint8_t vTypeByte = tdVTypeByte[bitmapMode][TD_VTYPE_NORM];
uint8_t *qBitmap = (uint8_t*)pBitmap;
for (int i = 0; i < nBytes; ++i) { for (int i = 0; i < nBytes; ++i) {
if (*((uint8_t *)pBitmap) != vTypeByte) { if (*qBitmap != vTypeByte) {
return false; return false;
} }
pBitmap = POINTER_SHIFT(pBitmap, i); qBitmap = (uint8_t *)POINTER_SHIFT(pBitmap, i);
} }
int32_t nLeft = numOfBits - nBytes * (bitmapMode == 0 ? TD_VTYPE_BITS : TD_VTYPE_BITS_I); int32_t nLeft = numOfBits - nBytes * (bitmapMode == 0 ? TD_VTYPE_BITS : TD_VTYPE_BITS_I);
for (int j = 0; j < nLeft; ++j) { for (int j = 0; j < nLeft; ++j) {
uint8_t vType; uint8_t vType;
tdGetBitmapValType(pBitmap, j, &vType, bitmapMode); tdGetBitmapValType(qBitmap, j, &vType, bitmapMode);
if (vType != TD_VTYPE_NORM) { if (vType != TD_VTYPE_NORM) {
return false; return false;
} }
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
class DndTestBnode : public ::testing::Test { class DndTestBnode : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { static void SetUpTestSuite() {
test.Init("/tmp/dbnodeTest", 9112); test.Init(TD_TMP_DIR_PATH "dbnodeTest", 9112);
taosMsleep(1100); taosMsleep(1100);
} }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class DndTestMnode : public ::testing::Test { class DndTestMnode : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/dmnodeTest", 9114); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "dmnodeTest", 9114); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class DndTestQnode : public ::testing::Test { class DndTestQnode : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/dqnodeTest", 9111); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "dqnodeTest", 9111); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class DndTestSnode : public ::testing::Test { class DndTestSnode : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/dsnodeTest", 9113); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "dsnodeTest", 9113); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -48,7 +48,7 @@ void Testbase::Init(const char* path, int16_t port) { ...@@ -48,7 +48,7 @@ void Testbase::Init(const char* path, int16_t port) {
strcpy(tsDataDir, path); strcpy(tsDataDir, path);
taosRemoveDir(path); taosRemoveDir(path);
taosMkDir(path); taosMkDir(path);
InitLog("/tmp/td"); InitLog(TD_TMP_DIR_PATH "td");
server.Start(); server.Start();
client.Init("root", "taosdata"); client.Init("root", "taosdata");
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class DndTestVnode : public ::testing::Test { class DndTestVnode : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/dvnodeTest", 9115); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "dvnodeTest", 9115); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class MndTestAcct : public ::testing::Test { class MndTestAcct : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/acctTest", 9012); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "acctTest", 9012); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -18,11 +18,11 @@ class MndTestBnode : public ::testing::Test { ...@@ -18,11 +18,11 @@ class MndTestBnode : public ::testing::Test {
public: public:
static void SetUpTestSuite() { static void SetUpTestSuite() {
test.Init("/tmp/mnode_test_bnode1", 9018); test.Init(TD_TMP_DIR_PATH "mnode_test_bnode1", 9018);
const char* fqdn = "localhost"; const char* fqdn = "localhost";
const char* firstEp = "localhost:9018"; const char* firstEp = "localhost:9018";
server2.Start("/tmp/mnode_test_bnode2", 9019); server2.Start(TD_TMP_DIR_PATH "mnode_test_bnode2", 9019);
taosMsleep(300); taosMsleep(300);
} }
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class MndTestDb : public ::testing::Test { class MndTestDb : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_db", 9030); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "mnode_test_db", 9030); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -18,14 +18,14 @@ class MndTestDnode : public ::testing::Test { ...@@ -18,14 +18,14 @@ class MndTestDnode : public ::testing::Test {
public: public:
static void SetUpTestSuite() { static void SetUpTestSuite() {
test.Init("/tmp/dnode_test_dnode1", 9023); test.Init(TD_TMP_DIR_PATH "dnode_test_dnode1", 9023);
const char* fqdn = "localhost"; const char* fqdn = "localhost";
const char* firstEp = "localhost:9023"; const char* firstEp = "localhost:9023";
// server2.Start("/tmp/dnode_test_dnode2", fqdn, 9024, firstEp); // server2.Start(TD_TMP_DIR_PATH "dnode_test_dnode2", fqdn, 9024, firstEp);
// server3.Start("/tmp/dnode_test_dnode3", fqdn, 9025, firstEp); // server3.Start(TD_TMP_DIR_PATH "dnode_test_dnode3", fqdn, 9025, firstEp);
// server4.Start("/tmp/dnode_test_dnode4", fqdn, 9026, firstEp); // server4.Start(TD_TMP_DIR_PATH "dnode_test_dnode4", fqdn, 9026, firstEp);
// server5.Start("/tmp/dnode_test_dnode5", fqdn, 9027, firstEp); // server5.Start(TD_TMP_DIR_PATH "dnode_test_dnode5", fqdn, 9027, firstEp);
taosMsleep(300); taosMsleep(300);
} }
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class MndTestFunc : public ::testing::Test { class MndTestFunc : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_func", 9038); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "mnode_test_func", 9038); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -18,11 +18,11 @@ class MndTestMnode : public ::testing::Test { ...@@ -18,11 +18,11 @@ class MndTestMnode : public ::testing::Test {
public: public:
static void SetUpTestSuite() { static void SetUpTestSuite() {
test.Init("/tmp/mnode_test_mnode1", 9028); test.Init(TD_TMP_DIR_PATH "mnode_test_mnode1", 9028);
const char* fqdn = "localhost"; const char* fqdn = "localhost";
const char* firstEp = "localhost:9028"; const char* firstEp = "localhost:9028";
// server2.Start("/tmp/mnode_test_mnode2", fqdn, 9029, firstEp); // server2.Start(TD_TMP_DIR_PATH "mnode_test_mnode2", fqdn, 9029, firstEp);
taosMsleep(300); taosMsleep(300);
} }
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class MndTestProfile : public ::testing::Test { class MndTestProfile : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_profile", 9031); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "mnode_test_profile", 9031); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -18,11 +18,11 @@ class MndTestQnode : public ::testing::Test { ...@@ -18,11 +18,11 @@ class MndTestQnode : public ::testing::Test {
public: public:
static void SetUpTestSuite() { static void SetUpTestSuite() {
test.Init("/tmp/mnode_test_qnode1", 9014); test.Init(TD_TMP_DIR_PATH "mnode_test_qnode1", 9014);
const char* fqdn = "localhost"; const char* fqdn = "localhost";
const char* firstEp = "localhost:9014"; const char* firstEp = "localhost:9014";
// server2.Start("/tmp/mnode_test_qnode2", fqdn, 9015, firstEp); // server2.Start(TD_TMP_DIR_PATH "mnode_test_qnode2", fqdn, 9015, firstEp);
taosMsleep(300); taosMsleep(300);
} }
......
...@@ -31,7 +31,7 @@ class MndTestSdb : public ::testing::Test { ...@@ -31,7 +31,7 @@ class MndTestSdb : public ::testing::Test {
tsLogEmbedded = 1; tsLogEmbedded = 1;
tsAsyncLog = 0; tsAsyncLog = 0;
const char *path = "/tmp/td"; const char *path = TD_TMP_DIR_PATH "td";
taosRemoveDir(path); taosRemoveDir(path);
taosMkDir(path); taosMkDir(path);
tstrncpy(tsLogDir, path, PATH_MAX); tstrncpy(tsLogDir, path, PATH_MAX);
...@@ -385,7 +385,7 @@ TEST_F(MndTestSdb, 01_Write_Str) { ...@@ -385,7 +385,7 @@ TEST_F(MndTestSdb, 01_Write_Str) {
mnode.v100 = 100; mnode.v100 = 100;
mnode.v200 = 200; mnode.v200 = 200;
opt.pMnode = &mnode; opt.pMnode = &mnode;
opt.path = "/tmp/mnode_test_sdb"; opt.path = TD_TMP_DIR_PATH "mnode_test_sdb";
taosRemoveDir(opt.path); taosRemoveDir(opt.path);
SSdbTable strTable1; SSdbTable strTable1;
...@@ -730,7 +730,7 @@ TEST_F(MndTestSdb, 01_Read_Str) { ...@@ -730,7 +730,7 @@ TEST_F(MndTestSdb, 01_Read_Str) {
mnode.v100 = 100; mnode.v100 = 100;
mnode.v200 = 200; mnode.v200 = 200;
opt.pMnode = &mnode; opt.pMnode = &mnode;
opt.path = "/tmp/mnode_test_sdb"; opt.path = TD_TMP_DIR_PATH "mnode_test_sdb";
SSdbTable strTable1; SSdbTable strTable1;
memset(&strTable1, 0, sizeof(SSdbTable)); memset(&strTable1, 0, sizeof(SSdbTable));
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class MndTestShow : public ::testing::Test { class MndTestShow : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_show", 9021); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "mnode_test_show", 9021); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class MndTestSma : public ::testing::Test { class MndTestSma : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_sma", 9035); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "mnode_test_sma", 9035); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -18,11 +18,11 @@ class MndTestSnode : public ::testing::Test { ...@@ -18,11 +18,11 @@ class MndTestSnode : public ::testing::Test {
public: public:
static void SetUpTestSuite() { static void SetUpTestSuite() {
test.Init("/tmp/mnode_test_snode1", 9016); test.Init(TD_TMP_DIR_PATH "mnode_test_snode1", 9016);
const char* fqdn = "localhost"; const char* fqdn = "localhost";
const char* firstEp = "localhost:9016"; const char* firstEp = "localhost:9016";
// server2.Start("/tmp/mnode_test_snode2", fqdn, 9017, firstEp); // server2.Start(TD_TMP_DIR_PATH "mnode_test_snode2", fqdn, 9017, firstEp);
taosMsleep(300); taosMsleep(300);
} }
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class MndTestStb : public ::testing::Test { class MndTestStb : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_stb", 9034); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "mnode_test_stb", 9034); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class MndTestTopic : public ::testing::Test { class MndTestTopic : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_topic", 9039); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "mnode_test_topic", 9039); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -14,10 +14,10 @@ ...@@ -14,10 +14,10 @@
class MndTestTrans1 : public ::testing::Test { class MndTestTrans1 : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { static void SetUpTestSuite() {
test.Init("/tmp/mnode_test_trans1", 9013); test.Init(TD_TMP_DIR_PATH "mnode_test_trans1", 9013);
const char* fqdn = "localhost"; const char* fqdn = "localhost";
const char* firstEp = "localhost:9013"; const char* firstEp = "localhost:9013";
// server2.Start("/tmp/mnode_test_trans2", fqdn, 9020, firstEp); // server2.Start(TD_TMP_DIR_PATH "mnode_test_trans2", fqdn, 9020, firstEp);
} }
static void TearDownTestSuite() { static void TearDownTestSuite() {
...@@ -26,7 +26,7 @@ class MndTestTrans1 : public ::testing::Test { ...@@ -26,7 +26,7 @@ class MndTestTrans1 : public ::testing::Test {
} }
static void KillThenRestartServer() { static void KillThenRestartServer() {
char file[PATH_MAX] = "/tmp/mnode_test_trans1/mnode/data/sdb.data"; char file[PATH_MAX] = TD_TMP_DIR_PATH "mnode_test_trans1/mnode/data/sdb.data";
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
int32_t size = 3 * 1024 * 1024; int32_t size = 3 * 1024 * 1024;
void* buffer = taosMemoryMalloc(size); void* buffer = taosMemoryMalloc(size);
......
...@@ -41,7 +41,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -41,7 +41,7 @@ class MndTestTrans2 : public ::testing::Test {
tsLogEmbedded = 1; tsLogEmbedded = 1;
tsAsyncLog = 0; tsAsyncLog = 0;
const char *logpath = "/tmp/td"; const char *logpath = TD_TMP_DIR_PATH "td";
taosRemoveDir(logpath); taosRemoveDir(logpath);
taosMkDir(logpath); taosMkDir(logpath);
tstrncpy(tsLogDir, logpath, PATH_MAX); tstrncpy(tsLogDir, logpath, PATH_MAX);
...@@ -68,7 +68,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -68,7 +68,7 @@ class MndTestTrans2 : public ::testing::Test {
tsTransPullupInterval = 1; tsTransPullupInterval = 1;
const char *mnodepath = "/tmp/mnode_test_trans"; const char *mnodepath = TD_TMP_DIR_PATH "mnode_test_trans";
taosRemoveDir(mnodepath); taosRemoveDir(mnodepath);
pMnode = mndOpen(mnodepath, &opt); pMnode = mndOpen(mnodepath, &opt);
mndStart(pMnode); mndStart(pMnode);
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class MndTestUser : public ::testing::Test { class MndTestUser : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_user", 9011); } static void SetUpTestSuite() { test.Init(TD_TMP_DIR_PATH "mnode_test_user", 9011); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
......
...@@ -76,9 +76,14 @@ target_link_libraries( ...@@ -76,9 +76,14 @@ target_link_libraries(
#PUBLIC scalar #PUBLIC scalar
PUBLIC transport PUBLIC transport
PUBLIC stream PUBLIC stream
PUBLIC index
) )
target_compile_definitions(vnode PUBLIC -DMETA_REFACT) target_compile_definitions(vnode PUBLIC -DMETA_REFACT)
if (${BUILD_WITH_INVERTEDINDEX})
add_definitions(-DUSE_INVERTED_INDEX)
endif(${BUILD_WITH_INVERTEDINDEX})
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(test) add_subdirectory(test)
endif(${BUILD_TEST}) endif(${BUILD_TEST})
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_VNODE_META_H_ #define _TD_VNODE_META_H_
#include "vnodeInt.h" #include "vnodeInt.h"
#include "index.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -70,7 +71,11 @@ struct SMeta { ...@@ -70,7 +71,11 @@ struct SMeta {
TTB* pUidIdx; TTB* pUidIdx;
TTB* pNameIdx; TTB* pNameIdx;
TTB* pCtbIdx; TTB* pCtbIdx;
#ifdef USE_INVERTED_INDEX
void* pTagIvtIdx;
#else
TTB* pTagIdx; TTB* pTagIdx;
#endif
TTB* pTtlIdx; TTB* pTtlIdx;
TTB* pSmaIdx; TTB* pSmaIdx;
SMetaIdx* pIdx; SMetaIdx* pIdx;
......
...@@ -53,10 +53,10 @@ int metaOpenIdx(SMeta *pMeta) { ...@@ -53,10 +53,10 @@ int metaOpenIdx(SMeta *pMeta) {
#endif #endif
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
SIndexOpts opts; // SIndexOpts opts;
if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) { // if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) {
return -1; // return -1;
} //}
#endif #endif
return 0; return 0;
...@@ -71,36 +71,37 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */ ...@@ -71,36 +71,37 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */
#endif #endif
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
SIndexOpts opts; // SIndexOpts opts;
if (indexClose(pMeta->pIdx->pIdx) != 0) { // if (indexClose(pMeta->pIdx->pIdx) != 0) {
return -1; // return -1;
} //}
// return 0;
#endif #endif
} }
int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbCfg) { int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbCfg) {
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
if (pTbCfgs->type == META_CHILD_TABLE) { // if (pTbCfgs->type == META_CHILD_TABLE) {
char buf[8] = {0}; // char buf[8] = {0};
int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId; // int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId;
sprintf(buf, "%d", colId); // colname // sprintf(buf, "%d", colId); // colname
char *pTagVal = (char *)tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId); // char *pTagVal = (char *)tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
tb_uid_t suid = pTbCfg->ctbCfg.suid; // super id // tb_uid_t suid = pTbCfg->ctbCfg.suid; // super id
tb_uid_t tuid = 0; // child table uid // tb_uid_t tuid = 0; // child table uid
SIndexMultiTerm *terms = indexMultiTermCreate(); // SIndexMultiTerm *terms = indexMultiTermCreate();
SIndexTerm *term = // SIndexTerm *term =
indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_BINARY, buf, strlen(buf), pTagVal, strlen(pTagVal), tuid); // indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_BINARY, buf, strlen(buf), pTagVal, strlen(pTagVal), tuid);
indexMultiTermAdd(terms, term); // indexMultiTermAdd(terms, term);
int ret = indexPut(pMeta->pIdx->pIdx, terms); // int ret = indexPut(pMeta->pIdx->pIdx, terms);
indexMultiTermDestroy(terms); // indexMultiTermDestroy(terms);
return ret; // return ret;
} else { //} else {
return DB_DONOTINDEX; // return DB_DONOTINDEX;
} //}
#endif #endif
// TODO // TODO
return 0; return 0;
......
...@@ -93,11 +93,24 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { ...@@ -93,11 +93,24 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
} }
// open pTagIdx // open pTagIdx
#ifdef USE_INVERTED_INDEX
// TODO(yihaoDeng), refactor later
char indexFullPath[128] = {0};
sprintf(indexFullPath, "%s/%s", pMeta->path, "invert");
taosMkDir(indexFullPath);
ret = indexOpen(indexOptsCreate(), indexFullPath, (SIndex **)&pMeta->pTagIvtIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
#else
ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx); ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
#endif
// open pTtlIdx // open pTtlIdx
ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx); ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
...@@ -128,7 +141,11 @@ _err: ...@@ -128,7 +141,11 @@ _err:
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
#ifdef USE_INVERTED_INDEX
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
#else
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
#endif
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
...@@ -145,7 +162,11 @@ int metaClose(SMeta *pMeta) { ...@@ -145,7 +162,11 @@ int metaClose(SMeta *pMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
#ifdef USE_INVERTED_INDEX
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
#else
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
#endif
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
......
...@@ -30,9 +30,9 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -30,9 +30,9 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int vLen = 0; int vLen = 0;
const void *pKey = NULL; const void *pKey = NULL;
const void *pVal = NULL; const void *pVal = NULL;
void *pBuf = NULL; void * pBuf = NULL;
int32_t szBuf = 0; int32_t szBuf = 0;
void *p = NULL; void * p = NULL;
SMetaReader mr = {0}; SMetaReader mr = {0};
// validate req // validate req
...@@ -71,9 +71,9 @@ _err: ...@@ -71,9 +71,9 @@ _err:
} }
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
TBC *pNameIdxc = NULL; TBC * pNameIdxc = NULL;
TBC *pUidIdxc = NULL; TBC * pUidIdxc = NULL;
TBC *pCtbIdxc = NULL; TBC * pCtbIdxc = NULL;
SCtbIdxKey *pCtbIdxKey; SCtbIdxKey *pCtbIdxKey;
const void *pKey = NULL; const void *pKey = NULL;
int nKey; int nKey;
...@@ -134,8 +134,8 @@ _err: ...@@ -134,8 +134,8 @@ _err:
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0}; SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0}; SMetaEntry nStbEntry = {0};
TBC *pUidIdxc = NULL; TBC * pUidIdxc = NULL;
TBC *pTbDbc = NULL; TBC * pTbDbc = NULL;
const void *pData; const void *pData;
int nData; int nData;
int64_t oversion; int64_t oversion;
...@@ -256,9 +256,9 @@ _err: ...@@ -256,9 +256,9 @@ _err:
} }
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) { int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
TBC *pTbDbc = NULL; TBC * pTbDbc = NULL;
TBC *pUidIdxc = NULL; TBC * pUidIdxc = NULL;
TBC *pNameIdxc = NULL; TBC * pNameIdxc = NULL;
const void *pData; const void *pData;
int nData; int nData;
tb_uid_t uid; tb_uid_t uid;
...@@ -377,14 +377,14 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi ...@@ -377,14 +377,14 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
} }
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
void *pVal = NULL; void * pVal = NULL;
int nVal = 0; int nVal = 0;
const void *pData = NULL; const void * pData = NULL;
int nData = 0; int nData = 0;
int ret = 0; int ret = 0;
tb_uid_t uid; tb_uid_t uid;
int64_t oversion; int64_t oversion;
SSchema *pColumn = NULL; SSchema * pColumn = NULL;
SMetaEntry entry = {0}; SMetaEntry entry = {0};
SSchemaWrapper *pSchema; SSchemaWrapper *pSchema;
int c; int c;
...@@ -530,7 +530,7 @@ _err: ...@@ -530,7 +530,7 @@ _err:
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
SMetaEntry ctbEntry = {0}; SMetaEntry ctbEntry = {0};
SMetaEntry stbEntry = {0}; SMetaEntry stbEntry = {0};
void *pVal = NULL; void * pVal = NULL;
int nVal = 0; int nVal = 0;
int ret; int ret;
int c; int c;
...@@ -561,7 +561,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -561,7 +561,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
oversion = *(int64_t *)pData; oversion = *(int64_t *)pData;
// search table.db // search table.db
TBC *pTbDbc = NULL; TBC * pTbDbc = NULL;
SDecoder dc1 = {0}; SDecoder dc1 = {0};
SDecoder dc2 = {0}; SDecoder dc2 = {0};
...@@ -585,7 +585,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -585,7 +585,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaDecodeEntry(&dc2, &stbEntry); metaDecodeEntry(&dc2, &stbEntry);
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag; SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
SSchema *pColumn = NULL; SSchema * pColumn = NULL;
int32_t iCol = 0; int32_t iCol = 0;
for (;;) { for (;;) {
pColumn = NULL; pColumn = NULL;
...@@ -681,8 +681,8 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) { ...@@ -681,8 +681,8 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey; STbDbKey tbDbKey;
void *pKey = NULL; void * pKey = NULL;
void *pVal = NULL; void * pVal = NULL;
int kLen = 0; int kLen = 0;
int vLen = 0; int vLen = 0;
SEncoder coder = {0}; SEncoder coder = {0};
...@@ -797,14 +797,14 @@ static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) { ...@@ -797,14 +797,14 @@ static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
} }
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
void *pData = NULL; void * pData = NULL;
int nData = 0; int nData = 0;
STbDbKey tbDbKey = {0}; STbDbKey tbDbKey = {0};
SMetaEntry stbEntry = {0}; SMetaEntry stbEntry = {0};
STagIdxKey *pTagIdxKey = NULL; STagIdxKey * pTagIdxKey = NULL;
int32_t nTagIdxKey; int32_t nTagIdxKey;
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0]; const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
const void *pTagData = NULL; // const void * pTagData = NULL; //
SDecoder dc = {0}; SDecoder dc = {0};
// get super table // get super table
...@@ -820,22 +820,33 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -820,22 +820,33 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
pTagData = tdGetKVRowValOfCol((const SKVRow)pCtbEntry->ctbEntry.pTags, pTagColumn->colId); pTagData = tdGetKVRowValOfCol((const SKVRow)pCtbEntry->ctbEntry.pTags, pTagColumn->colId);
// update tag index // update tag index
#ifdef USE_INVERTED_INDEX
tb_uid_t suid = pCtbEntry->ctbEntry.suid;
tb_uid_t tuid = pCtbEntry->uid;
SIndexMultiTerm *tmGroup = indexMultiTermCreate();
SIndexTerm *tm = indexTermCreate(suid, ADD_VALUE, pTagColumn->type, pTagColumn->name, sizeof(pTagColumn->name),
pTagData, pTagData == NULL ? 0 : strlen(pTagData));
indexMultiTermAdd(tmGroup, tm);
int ret = indexPut((SIndex *)pMeta->pTagIvtIdx, tmGroup, tuid);
indexMultiTermDestroy(tmGroup);
#else
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, pTagColumn->type, pCtbEntry->uid, if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, pTagColumn->type, pCtbEntry->uid,
&pTagIdxKey, &nTagIdxKey) < 0) { &pTagIdxKey, &nTagIdxKey) < 0) {
return -1; return -1;
} }
tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn); tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
#endif
tDecoderClear(&dc); tDecoderClear(&dc);
tdbFree(pData); tdbFree(pData);
return 0; return 0;
} }
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
SEncoder coder = {0}; SEncoder coder = {0};
void *pVal = NULL; void * pVal = NULL;
int vLen = 0; int vLen = 0;
int rcode = 0; int rcode = 0;
SSkmDbKey skmDbKey = {0}; SSkmDbKey skmDbKey = {0};
......
...@@ -600,6 +600,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -600,6 +600,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SSDataBlock block = {0}; SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows, if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
&block.info.numOfCols) < 0) { &block.info.numOfCols) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0); ASSERT(0);
} }
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
......
...@@ -91,12 +91,22 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -91,12 +91,22 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
if (pHandle->pSchema == NULL) { if (pHandle->pSchema == NULL) {
tqError("cannot found schema for table: %ld, version %d", pHandle->msgIter.suid, pHandle->sver); tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table",
pHandle->msgIter.uid, pHandle->msgIter.suid, pHandle->sver);
/*ASSERT(0);*/
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1; return -1;
} }
// this interface use suid instead of uid // this interface use suid instead of uid
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true); pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true);
if (pHandle->pSchemaWrapper == NULL) {
tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table",
pHandle->msgIter.suid, pHandle->sver);
/*ASSERT(0);*/
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
pHandle->sver = sversion; pHandle->sver = sversion;
pHandle->cachedSchemaUid = pHandle->msgIter.suid; pHandle->cachedSchemaUid = pHandle->msgIter.suid;
} }
......
...@@ -1138,6 +1138,9 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF ...@@ -1138,6 +1138,9 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
memcpy(tptr, pDataCol->pData, flen); memcpy(tptr, pDataCol->pData, flen);
if (tBitmaps > 0) { if (tBitmaps > 0) {
bptr = POINTER_SHIFT(pBlockData, lsize + flen); bptr = POINTER_SHIFT(pBlockData, lsize + flen);
if (isSuper && !tdDataColsIsBitmapI(pDataCols)) {
tdMergeBitmap((uint8_t *)pDataCol->pBitmap, rowsToWrite, (uint8_t *)pDataCol->pBitmap);
}
memcpy(bptr, pDataCol->pBitmap, tBitmaps); memcpy(bptr, pDataCol->pBitmap, tBitmaps);
tBitmapsLen = tBitmaps; tBitmapsLen = tBitmaps;
flen += tBitmapsLen; flen += tBitmapsLen;
...@@ -1503,13 +1506,16 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1503,13 +1506,16 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
tSkipListIterNext(pCommitIter->pIter); tSkipListIterNext(pCommitIter->pIter);
} else { } else {
if (lastKey != key1) { if (lastKey != key1) {
lastKey = key1; if (lastKey != TSKEY_INITIAL_VAL) {
++pTarget->numOfRows; ++pTarget->numOfRows;
} }
lastKey = key1;
}
// copy disk data // copy disk data
for (int i = 0; i < pDataCols->numOfCols; ++i) { for (int i = 0; i < pDataCols->numOfCols; ++i) {
SCellVal sVal = {0}; SCellVal sVal = {0};
// no duplicated TS keys in pDataCols from file
if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) { if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) {
TASSERT(0); TASSERT(0);
} }
......
...@@ -1732,6 +1732,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ...@@ -1732,6 +1732,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
if (*lastRowKey != TSKEY_INITIAL_VAL) { if (*lastRowKey != TSKEY_INITIAL_VAL) {
++(*curRow); ++(*curRow);
} }
*lastRowKey = rowKey;
++nResult; ++nResult;
} else if (update) { } else if (update) {
mergeOption = 2; mergeOption = 2;
...@@ -1739,8 +1740,6 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ...@@ -1739,8 +1740,6 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
mergeOption = 0; mergeOption = 0;
break; break;
} }
*lastRowKey = rowKey;
} }
} else { } else {
// TODO: use STSRowIter // TODO: use STSRowIter
...@@ -1753,6 +1752,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ...@@ -1753,6 +1752,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
if (*lastRowKey != TSKEY_INITIAL_VAL) { if (*lastRowKey != TSKEY_INITIAL_VAL) {
++(*curRow); ++(*curRow);
} }
*lastRowKey = rowKey;
++nResult; ++nResult;
} else if (update) { } else if (update) {
mergeOption = 2; mergeOption = 2;
...@@ -1760,7 +1760,6 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ...@@ -1760,7 +1760,6 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
mergeOption = 0; mergeOption = 0;
break; break;
} }
*lastRowKey = rowKey;
} else { } else {
SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1); SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1);
colId = pColIdx->colId; colId = pColIdx->colId;
...@@ -1965,7 +1964,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -1965,7 +1964,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID && assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
cur->pos >= 0 && cur->pos < pBlock->numOfRows); cur->pos >= 0 && cur->pos < pBlock->numOfRows);
// Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData interface.
TSKEY* tsArray = pCols->cols[0].pData; TSKEY* tsArray = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
tsArray[pBlock->numOfRows - 1] == pBlock->keyLast); tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
...@@ -1995,6 +1994,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -1995,6 +1994,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
int32_t pos = cur->pos; int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER; cur->win = TSWINDOW_INITIALIZER;
bool adjustPos = false;
// no data in buffer, load data from file directly // no data in buffer, load data from file directly
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
...@@ -2016,6 +2016,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -2016,6 +2016,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
break; break;
} }
if (adjustPos) {
if (key == lastKeyAppend) {
pos -= step;
}
adjustPos = false;
}
if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) || if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) ||
((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) { ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) {
break; break;
...@@ -2107,7 +2114,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -2107,7 +2114,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
pos += step; pos += step;
adjustPos = true;
} else { } else {
// discard the memory record
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} }
} else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) { } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) {
......
...@@ -20,11 +20,11 @@ ...@@ -20,11 +20,11 @@
static void tsdbResetReadTable(SReadH *pReadh); static void tsdbResetReadTable(SReadH *pReadh);
static void tsdbResetReadFile(SReadH *pReadh); static void tsdbResetReadFile(SReadH *pReadh);
static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock); static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock);
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols); static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int8_t bitmapMode);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int32_t bitmapLen, int8_t comp, static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int32_t bitmapLen, int8_t comp,
int numOfRows, int numOfBitmaps, int maxPoints, char *buffer, int bufferSize); int numOfRows, int numOfBitmaps, int maxPoints, char *buffer, int bufferSize);
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds, static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
int numOfColIds); int numOfColIds, int8_t bitmapMode);
static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) { int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
...@@ -266,10 +266,11 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ...@@ -266,10 +266,11 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
} }
} }
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0]) < 0) return -1;
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0], TSDB_BITMODE_ONE_BIT) < 0) return -1;
for (int i = 1; i < pBlock->numOfSubBlocks; i++) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++; iBlock++;
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1], TSDB_BITMODE_DEFAULT) < 0) return -1;
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
...@@ -309,10 +310,10 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, ...@@ -309,10 +310,10 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
} }
} }
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds) < 0) return -1; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds, TSDB_BITMODE_ONE_BIT) < 0) return -1;
for (int i = 1; i < pBlock->numOfSubBlocks; i++) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++; iBlock++;
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds, TSDB_BITMODE_DEFAULT) < 0) return -1;
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
...@@ -543,14 +544,14 @@ static void tsdbResetReadFile(SReadH *pReadh) { ...@@ -543,14 +544,14 @@ static void tsdbResetReadFile(SReadH *pReadh) {
tsdbCloseDFileSet(TSDB_READ_FSET(pReadh)); tsdbCloseDFileSet(TSDB_READ_FSET(pReadh));
} }
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols) { static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int8_t bitmapMode) {
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh); SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
if (tsdbIsSupBlock(pBlock)) { if (tdIsBitmapModeI(bitmapMode)) {
tdDataColsSetBitmapI(pDataCols); tdDataColsSetBitmapI(pDataCols);
} }
...@@ -730,7 +731,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 ...@@ -730,7 +731,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
} }
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds, static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
int numOfColIds) { int numOfColIds, int8_t bitmapMode) {
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID); ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
...@@ -739,7 +740,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * ...@@ -739,7 +740,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
if (tsdbIsSupBlock(pBlock)) { if (tdIsBitmapModeI(bitmapMode)) {
tdDataColsSetBitmapI(pDataCols); tdDataColsSetBitmapI(pDataCols);
} }
......
...@@ -41,7 +41,7 @@ class TqMetaUpdateAppendTest : public ::testing::Test { ...@@ -41,7 +41,7 @@ class TqMetaUpdateAppendTest : public ::testing::Test {
void TearDown() override { tqStoreClose(pMeta); } void TearDown() override { tqStoreClose(pMeta); }
STqMetaStore* pMeta; STqMetaStore* pMeta;
const char* pathName = "/tmp/tq_test"; const char* pathName = TD_TMP_DIR_PATH "tq_test";
}; };
TEST_F(TqMetaUpdateAppendTest, copyPutTest) { TEST_F(TqMetaUpdateAppendTest, copyPutTest) {
......
...@@ -58,6 +58,17 @@ enum { ...@@ -58,6 +58,17 @@ enum {
CTG_ACT_MAX CTG_ACT_MAX
}; };
typedef enum {
CTG_TASK_GET_QNODE = 0,
CTG_TASK_GET_DB_VGROUP,
CTG_TASK_GET_DB_CFG,
CTG_TASK_GET_TB_META,
CTG_TASK_GET_TB_HASH,
CTG_TASK_GET_INDEX,
CTG_TASK_GET_UDF,
CTG_TASK_GET_USER,
} CTG_TASK_TYPE;
typedef struct SCtgDebug { typedef struct SCtgDebug {
bool lockEnable; bool lockEnable;
bool cacheEnable; bool cacheEnable;
...@@ -66,6 +77,43 @@ typedef struct SCtgDebug { ...@@ -66,6 +77,43 @@ typedef struct SCtgDebug {
uint32_t showCachePeriodSec; uint32_t showCachePeriodSec;
} SCtgDebug; } SCtgDebug;
typedef struct SCtgTbCacheInfo {
bool inCache;
uint64_t dbId;
uint64_t suid;
int32_t tbType;
} SCtgTbCacheInfo;
typedef struct SCtgTbMetaCtx {
SCtgTbCacheInfo tbInfo;
SName* pName;
int32_t flag;
} SCtgTbMetaCtx;
typedef struct SCtgDbVgCtx {
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbVgCtx;
typedef struct SCtgDbCfgCtx {
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbCfgCtx;
typedef struct SCtgTbHashCtx {
char dbFName[TSDB_DB_FNAME_LEN];
SName* pName;
} SCtgTbHashCtx;
typedef struct SCtgIndexCtx {
char indexFName[TSDB_INDEX_FNAME_LEN];
} SCtgIndexCtx;
typedef struct SCtgUdfCtx {
char udfName[TSDB_FUNC_NAME_LEN];
} SCtgUdfCtx;
typedef struct SCtgUserCtx {
SUserAuthInfo user;
} SCtgUserCtx;
typedef struct SCtgTbMetaCache { typedef struct SCtgTbMetaCache {
SRWLatch stbLock; SRWLatch stbLock;
...@@ -113,6 +161,55 @@ typedef struct SCatalog { ...@@ -113,6 +161,55 @@ typedef struct SCatalog {
SCtgRentMgmt stbRent; SCtgRentMgmt stbRent;
} SCatalog; } SCatalog;
typedef struct SCtgJob {
int64_t refId;
SArray* pTasks;
int32_t taskDone;
SMetaData jobRes;
int32_t rspCode;
uint64_t queryId;
SCatalog* pCtg;
void* pTrans;
const SEpSet* pMgmtEps;
void* userParam;
catalogCallback userFp;
int32_t tbMetaNum;
int32_t tbHashNum;
int32_t dbVgNum;
int32_t udfNum;
int32_t qnodeNum;
int32_t dbCfgNum;
int32_t indexNum;
int32_t userNum;
} SCtgJob;
typedef struct SCtgMsgCtx {
int32_t reqType;
void* lastOut;
void* out;
char* target;
} SCtgMsgCtx;
typedef struct SCtgTask {
CTG_TASK_TYPE type;
int32_t taskId;
SCtgJob *pJob;
void* taskCtx;
SCtgMsgCtx msgCtx;
void* res;
} SCtgTask;
typedef int32_t (*ctgLanchTaskFp)(SCtgTask*);
typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTask*, int32_t, const SDataBuf *, int32_t);
typedef int32_t (*ctgDumpTaskResFp)(SCtgTask*);
typedef struct SCtgAsyncFps {
ctgLanchTaskFp launchFp;
ctgHandleTaskMsgRspFp handleRspFp;
ctgDumpTaskResFp dumpResFp;
} SCtgAsyncFps;
typedef struct SCtgApiStat { typedef struct SCtgApiStat {
#ifdef WINDOWS #ifdef WINDOWS
...@@ -214,6 +311,7 @@ typedef struct SCtgQueue { ...@@ -214,6 +311,7 @@ typedef struct SCtgQueue {
typedef struct SCatalogMgmt { typedef struct SCatalogMgmt {
bool exit; bool exit;
int32_t jobPool;
SRWLatch lock; SRWLatch lock;
SCtgQueue queue; SCtgQueue queue;
TdThread updateThread; TdThread updateThread;
...@@ -327,10 +425,80 @@ typedef struct SCtgAction { ...@@ -327,10 +425,80 @@ typedef struct SCtgAction {
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0) #define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0)
#define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0) #define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
#define CTG_PARAMS SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
extern void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p); #define CTG_PARAMS_LIST() pCtg, pTrans, pMgmtEps
extern void ctgdShowClusterCache(SCatalog* pCtg);
extern int32_t ctgdShowCacheInfo(void); void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p);
void ctgdShowClusterCache(SCatalog* pCtg);
int32_t ctgdShowCacheInfo(void);
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgActUpdateVg(SCtgMetaAction *action);
int32_t ctgActUpdateTb(SCtgMetaAction *action);
int32_t ctgActRemoveDB(SCtgMetaAction *action);
int32_t ctgActRemoveStb(SCtgMetaAction *action);
int32_t ctgActRemoveTb(SCtgMetaAction *action);
int32_t ctgActUpdateUser(SCtgMetaAction *action);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache);
void ctgReleaseVgInfo(SCtgDBCache *dbCache);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist);
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
int32_t ctgPutRmTbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq);
int32_t ctgPutUpdateVgToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);
int32_t ctgPutUpdateTbToQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq);
int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
int32_t ctgStartUpdateThread();
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask);
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target);
int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask);
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnode(CTG_PARAMS, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param);
int32_t ctgLaunchJob(SCtgJob *pJob);
int32_t ctgMakeAsyncRes(SCtgJob *pJob);
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst);
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput);
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList);
void ctgFreeJob(void* job);
void ctgFreeHandle(SCatalog* pCtg);
void ctgFreeVgInfo(SDBVgInfo *vgInfo);
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup);
void ctgResetTbMetaTask(SCtgTask* pTask);
void ctgFreeDbCache(SCtgDBCache *dbCache);
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2);
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug;
extern SCtgAsyncFps gCtgAsyncFps[];
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_CATALOG_REMOTE_H_
#define _TD_CATALOG_REMOTE_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SCtgTaskCallbackParam {
uint64_t queryId;
int64_t refId;
uint64_t taskId;
int32_t reqType;
} SCtgTaskCallbackParam;
#ifdef __cplusplus
}
#endif
#endif /*_TD_CATALOG_REMOTE_H_*/
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
...@@ -40,10 +40,8 @@ ...@@ -40,10 +40,8 @@
namespace { namespace {
extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta,
bool *inCache, int32_t flag, uint64_t *dbId);
extern "C" int32_t ctgdGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type); extern "C" int32_t ctgdGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
extern "C" int32_t ctgActUpdateTbl(SCtgMetaAction *action); extern "C" int32_t ctgActUpdateTb(SCtgMetaAction *action);
extern "C" int32_t ctgdEnableDebug(char *option); extern "C" int32_t ctgdEnableDebug(char *option);
extern "C" int32_t ctgdGetStatNum(char *option, void *res); extern "C" int32_t ctgdGetStatNum(char *option, void *res);
...@@ -52,7 +50,7 @@ void ctgTestSetRspCTableMeta(); ...@@ -52,7 +50,7 @@ void ctgTestSetRspCTableMeta();
void ctgTestSetRspSTableMeta(); void ctgTestSetRspSTableMeta();
void ctgTestSetRspMultiSTableMeta(); void ctgTestSetRspMultiSTableMeta();
extern "C" SCatalogMgmt gCtgMgmt; //extern "C" SCatalogMgmt gCtgMgmt;
enum { enum {
CTGT_RSP_VGINFO = 1, CTGT_RSP_VGINFO = 1,
...@@ -859,8 +857,12 @@ void *ctgTestGetCtableMetaThread(void *param) { ...@@ -859,8 +857,12 @@ void *ctgTestGetCtableMetaThread(void *param) {
strcpy(cn.dbname, "db1"); strcpy(cn.dbname, "db1");
strcpy(cn.tname, ctgTestCTablename); strcpy(cn.tname, ctgTestCTablename);
SCtgTbMetaCtx ctx = {0};
ctx.pName = &cn;
ctx.flag = CTG_FLAG_UNKNOWN_STB;
while (!ctgTestStop) { while (!ctgTestStop) {
code = ctgGetTableMetaFromCache(pCtg, &cn, &tbMeta, &inCache, 0, NULL); code = ctgReadTbMetaFromCache(pCtg, &ctx, &tbMeta);
if (code || !inCache) { if (code || !inCache) {
assert(0); assert(0);
} }
...@@ -899,7 +901,7 @@ void *ctgTestSetCtableMetaThread(void *param) { ...@@ -899,7 +901,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
msg->output = output; msg->output = output;
action.data = msg; action.data = msg;
code = ctgActUpdateTbl(&action); code = ctgActUpdateTb(&action);
if (code) { if (code) {
assert(0); assert(0);
} }
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <executorimpl.h>
#include "filter.h" #include "filter.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
...@@ -4059,7 +4058,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -4059,7 +4058,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
defaultBufsz = defaultPgsz * 4; defaultBufsz = defaultPgsz * 4;
} }
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, "/tmp/"); int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -614,7 +614,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -614,7 +614,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
int32_t code = createDiskbasedBuf(&pInfo->pBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); int32_t code = createDiskbasedBuf(&pInfo->pBuf, 4096, 4096 * 256, pTaskInfo->id.str, TD_TMP_DIR_PATH);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -19,38 +19,11 @@ ...@@ -19,38 +19,11 @@
#include "nodes.h" #include "nodes.h"
#include "tdatablock.h" #include "tdatablock.h"
typedef struct SIFCtx { // clang-format off
int32_t code; #define SIF_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
SHashObj *pRes; /* element is SScalarParam */ #define SIF_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
bool noExec; // true: just iterate condition tree, and add hint to executor plan #define SIF_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
// SIdxFltStatus st; // clang-format on
} SIFCtx;
#define SIF_ERR_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
return _code; \
} \
} while (0)
#define SIF_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
} \
return _code; \
} while (0)
#define SIF_ERR_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
typedef struct SIFParam { typedef struct SIFParam {
SHashObj *pFilter; SHashObj *pFilter;
...@@ -65,6 +38,13 @@ typedef struct SIFParam { ...@@ -65,6 +38,13 @@ typedef struct SIFParam {
char colName[TSDB_COL_NAME_LEN]; char colName[TSDB_COL_NAME_LEN];
} SIFParam; } SIFParam;
typedef struct SIFCtx {
int32_t code;
SHashObj *pRes; /* element is SIFParam */
bool noExec; // true: just iterate condition tree, and add hint to executor plan
// SIdxFltStatus st;
} SIFCtx;
static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
if (src == OP_TYPE_GREATER_THAN) { if (src == OP_TYPE_GREATER_THAN) {
*dst = QUERY_GREATER_THAN; *dst = QUERY_GREATER_THAN;
......
...@@ -990,7 +990,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR ...@@ -990,7 +990,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
size_t childKeyBufSize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY); size_t childKeyBufSize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
initCatchSupporter(&pInfo->childAggSup, 1024, childKeyBufSize, initCatchSupporter(&pInfo->childAggSup, 1024, childKeyBufSize,
"StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan "StreamFinalInterval", TD_TMP_DIR_PATH); // TODO(liuyao) get row size from phy plan
pOperator->name = "StreamBlockScanOperator"; pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......
...@@ -1181,6 +1181,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1181,6 +1181,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols); pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols);
pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
......
...@@ -247,7 +247,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_ ...@@ -247,7 +247,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
return NULL; return NULL;
} }
int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, "/tmp"); int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, TD_TMP_DIR_PATH);
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;
return NULL; return NULL;
......
...@@ -155,7 +155,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { ...@@ -155,7 +155,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
int32_t start = 0; int32_t start = 0;
if (pHandle->pBuf == NULL) { if (pHandle->pBuf == NULL) {
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", "/tmp"); int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", TD_TMP_DIR_PATH);
dBufSetPrintInfo(pHandle->pBuf); dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -217,7 +217,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int ...@@ -217,7 +217,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
} else { } else {
// multi-pass internal merge sort is required // multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) { if (pHandle->pBuf == NULL) {
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", "/tmp"); code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", TD_TMP_DIR_PATH);
dBufSetPrintInfo(pHandle->pBuf); dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -57,7 +57,7 @@ void sifInitLogFile() { ...@@ -57,7 +57,7 @@ void sifInitLogFile() {
tsAsyncLog = 0; tsAsyncLog = 0;
qDebugFlag = 159; qDebugFlag = 159;
strcpy(tsLogDir, "/tmp/sif"); strcpy(tsLogDir, TD_TMP_DIR_PATH "sif");
taosRemoveDir(tsLogDir); taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir); taosMkDir(tsLogDir);
......
...@@ -66,22 +66,18 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) { ...@@ -66,22 +66,18 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
} }
static int32_t getUdfInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) { static int32_t getUdfInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
SFuncInfo* pInfo = NULL; SFuncInfo funcInfo = {0};
int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFunc->functionName, &pInfo); int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFunc->functionName, &funcInfo);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
if (NULL == pInfo) {
snprintf(pParam->pErrBuf, pParam->errBufLen, "Invalid function name: %s", pFunc->functionName);
return TSDB_CODE_FUNC_INVALID_FUNTION;
}
pFunc->funcType = FUNCTION_TYPE_UDF; pFunc->funcType = FUNCTION_TYPE_UDF;
pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == pInfo->funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID; pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == funcInfo.funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID;
pFunc->node.resType.type = pInfo->outputType; pFunc->node.resType.type = funcInfo.outputType;
pFunc->node.resType.bytes = pInfo->outputLen; pFunc->node.resType.bytes = funcInfo.outputLen;
pFunc->udfBufSize = pInfo->bufSize; pFunc->udfBufSize = funcInfo.bufSize;
tFreeSFuncInfo(pInfo); tFreeSFuncInfo(&funcInfo);
taosMemoryFree(pInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -255,7 +255,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, ...@@ -255,7 +255,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo(pBucket); resetSlotInfo(pBucket);
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", "/tmp"); int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", TD_TMP_DIR_PATH);
if (ret != 0) { if (ret != 0) {
tMemBucketDestroy(pBucket); tMemBucketDestroy(pBucket);
return NULL; return NULL;
......
此差异已折叠。
此差异已折叠。
...@@ -31,7 +31,7 @@ if (${BUILD_WITH_INVERTEDINDEX}) ...@@ -31,7 +31,7 @@ if (${BUILD_WITH_INVERTEDINDEX})
endif(${BUILD_WITH_INVERTEDINDEX}) endif(${BUILD_WITH_INVERTEDINDEX})
if (${BUILD_TEST}) #if (${BUILD_TEST})
add_subdirectory(test) # add_subdirectory(test)
endif(${BUILD_TEST}) #endif(${BUILD_TEST})
...@@ -63,7 +63,10 @@ typedef struct CacheTerm { ...@@ -63,7 +63,10 @@ typedef struct CacheTerm {
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type); IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type);
void indexCacheForceToMerge(void* cache);
void indexCacheDestroy(void* cache); void indexCacheDestroy(void* cache);
void indexCacheBroadcast(void* cache);
void indexCacheWait(void* cache);
Iterate* indexCacheIteratorCreate(IndexCache* cache); Iterate* indexCacheIteratorCreate(IndexCache* cache);
void indexCacheIteratorDestroy(Iterate* iiter); void indexCacheIteratorDestroy(Iterate* iiter);
......
...@@ -58,6 +58,8 @@ struct SIndex { ...@@ -58,6 +58,8 @@ struct SIndex {
SIndexStat stat; SIndexStat stat;
TdThreadMutex mtx; TdThreadMutex mtx;
tsem_t sem;
bool quit;
}; };
struct SIndexOpts { struct SIndexOpts {
...@@ -69,6 +71,7 @@ struct SIndexOpts { ...@@ -69,6 +71,7 @@ struct SIndexOpts {
int32_t cacheSize; // MB int32_t cacheSize; // MB
// add cache module later // add cache module later
#endif #endif
int32_t cacheOpt; // MB
}; };
struct SIndexMultiTermQuery { struct SIndexMultiTermQuery {
...@@ -131,42 +134,14 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf); ...@@ -131,42 +134,14 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf);
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
#define indexFatal(...) \ // clang-format off
do { \ #define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("INDEX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0)
if (sDebugFlag & DEBUG_FATAL) { \ #define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("INDEX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0)
taosPrintLog("index FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ #define indexWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("INDEX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0)
} \ #define indexInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("INDEX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0)
} while (0) #define indexDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("INDEX ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0)
#define indexError(...) \ #define indexTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("INDEX ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0)
do { \ // clang-format on
if (sDebugFlag & DEBUG_ERROR) { \
taosPrintLog("index ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
} \
} while (0)
#define indexWarn(...) \
do { \
if (sDebugFlag & DEBUG_WARN) { \
taosPrintLog("index WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
} while (0)
#define indexInfo(...) \
do { \
if (sDebugFlag & DEBUG_INFO) { \
taosPrintLog("index ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
} while (0)
#define indexDebug(...) \
do { \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("index ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define indexTrace(...) \
do { \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("index ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0) #define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
......
此差异已折叠。
此差异已折叠。
...@@ -22,6 +22,29 @@ ...@@ -22,6 +22,29 @@
#include "ttypes.h" #include "ttypes.h"
#include "tvariant.h" #include "tvariant.h"
#define INDEX_DATA_BOOL_NULL 0x02
#define INDEX_DATA_TINYINT_NULL 0x80
#define INDEX_DATA_SMALLINT_NULL 0x8000
#define INDEX_DATA_INT_NULL 0x80000000L
#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL 0xFF
#define INDEX_DATA_JSON_NULL 0xFFFFFFFF
#define INDEX_DATA_JSON_null 0xFFFFFFFE
#define INDEX_DATA_JSON_NOT_NULL 0x01
#define INDEX_DATA_UTINYINT_NULL 0xFF
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
#define INDEX_DATA_UINT_NULL 0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL
#define INDEX_DATA_NULL_STR "NULL"
#define INDEX_DATA_NULL_STR_L "null"
char JSON_COLUMN[] = "JSON"; char JSON_COLUMN[] = "JSON";
char JSON_VALUE_DELIM = '&'; char JSON_VALUE_DELIM = '&';
...@@ -372,7 +395,7 @@ int32_t indexConvertDataToStr(void* src, int8_t type, void** dst) { ...@@ -372,7 +395,7 @@ int32_t indexConvertDataToStr(void* src, int8_t type, void** dst) {
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src)); tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
*dst = taosMemoryCalloc(1, tlen + 1); *dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src)); tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src));
*dst = (char*) * dst - tlen; *dst = (char*)*dst - tlen;
break; break;
} }
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
......
此差异已折叠。
此差异已折叠。
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "tskiplist.h" #include "tskiplist.h"
#include "tutil.h" #include "tutil.h"
static std::string dir = "/tmp/index"; static std::string dir = TD_TMP_DIR_PATH "index";
static char indexlog[PATH_MAX] = {0}; static char indexlog[PATH_MAX] = {0};
static char tindex[PATH_MAX] = {0}; static char tindex[PATH_MAX] = {0};
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册