提交 fc09f8ef 编写于 作者: L liuyao

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/ly_refactor

# apr-util
ExternalProject_Add(aprutil-1
URL https://dlcdn.apache.org//apr/apr-util-1.6.3.tar.gz
URL_HASH SHA256=2b74d8932703826862ca305b094eef2983c27b39d5c9414442e9976a9acf1983
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
#GIT_REPOSITORY https://github.com/apache/apr-util.git
#GIT_TAG 1.5.4
SOURCE_DIR "${TD_CONTRIB_DIR}/apr-util"
#BINARY_DIR ""
BUILD_IN_SOURCE TRUE
BUILD_ALWAYS 1
#UPDATE_COMMAND ""
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1/ --with-apr=$ENV{HOME}/.cos-local.1
#CONFIGURE_COMMAND ./configure --with-apr=/usr/local/apr
BUILD_COMMAND make
INSTALL_COMMAND make install
TEST_COMMAND ""
)
# apr
ExternalProject_Add(apr-1
URL https://dlcdn.apache.org//apr/apr-1.7.4.tar.gz
URL_HASH SHA256=a4137dd82a185076fa50ba54232d920a17c6469c30b0876569e1c2a05ff311d9
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
#GIT_REPOSITORY https://github.com/apache/apr.git
#GIT_TAG 1.5.2
SOURCE_DIR "${TD_CONTRIB_DIR}/apr"
BUILD_IN_SOURCE TRUE
UPDATE_DISCONNECTED TRUE
BUILD_ALWAYS 1
#UPDATE_COMMAND ""
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1/ --enable-shared=no
#CONFIGURE_COMMAND ./configure
BUILD_COMMAND make
INSTALL_COMMAND make install
TEST_COMMAND ""
)
......@@ -125,6 +125,16 @@ option(
ON
)
IF(${TD_LINUX})
option(
BUILD_WITH_COS
"If build with cos"
ON
)
ENDIF ()
option(
BUILD_WITH_SQLITE
"If build with sqlite"
......
# cos
ExternalProject_Add(cos
GIT_REPOSITORY https://github.com/tencentyun/cos-c-sdk-v5.git
GIT_TAG v5.0.16
SOURCE_DIR "${TD_CONTRIB_DIR}/cos-c-sdk-v5"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
# curl
ExternalProject_Add(curl
URL https://curl.se/download/curl-8.2.1.tar.gz
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
#GIT_REPOSITORY https://github.com/curl/curl.git
#GIT_TAG curl-7_88_1
SOURCE_DIR "${TD_CONTRIB_DIR}/curl"
BUILD_IN_SOURCE TRUE
BUILD_ALWAYS 1
#UPDATE_COMMAND ""
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1 --without-ssl --enable-shared=no --disable-ldap --disable-ldaps --without-brotli
#CONFIGURE_COMMAND ./configure --without-ssl
BUILD_COMMAND make
INSTALL_COMMAND make install
TEST_COMMAND ""
)
# cos
ExternalProject_Add(mxml
GIT_REPOSITORY https://github.com/michaelrsweet/mxml.git
GIT_TAG release-2.10
SOURCE_DIR "${TD_CONTRIB_DIR}/mxml"
#BINARY_DIR ""
BUILD_IN_SOURCE TRUE
#UPDATE_COMMAND ""
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1 --enable-shared=no
#CONFIGURE_COMMAND ./configure
BUILD_COMMAND make
INSTALL_COMMAND make install
TEST_COMMAND ""
)
......@@ -6,6 +6,39 @@ function(cat IN_FILE OUT_FILE)
file(APPEND ${OUT_FILE} "${CONTENTS}")
endfunction(cat IN_FILE OUT_FILE)
if(${TD_LINUX})
set(CONTRIB_TMP_FILE3 "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in3")
configure_file("${TD_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
if(${BUILD_WITH_COS})
file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.1/)
cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
endif(${BUILD_WITH_COS})
configure_file(${CONTRIB_TMP_FILE3} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download")
execute_process(COMMAND "${CMAKE_COMMAND}" --build .
WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download")
set(CONTRIB_TMP_FILE2 "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in2")
configure_file("${TD_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE2})
if(${BUILD_WITH_COS})
cat("${TD_SUPPORT_DIR}/apr-util_CMakeLists.txt.in" ${CONTRIB_TMP_FILE2})
endif(${BUILD_WITH_COS})
configure_file(${CONTRIB_TMP_FILE2} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download")
execute_process(COMMAND "${CMAKE_COMMAND}" --build .
WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download")
endif(${TD_LINUX})
set(CONTRIB_TMP_FILE "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in")
configure_file("${TD_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
......@@ -122,6 +155,16 @@ if(${BUILD_WITH_SQLITE})
cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_SQLITE})
# cos
if(${BUILD_WITH_COS})
#cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
#cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
#cat("${TD_SUPPORT_DIR}/apr-util_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
#cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/cos_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_COS)
endif(${BUILD_WITH_COS})
# lucene
if(${BUILD_WITH_LUCENE})
cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
......@@ -347,6 +390,31 @@ if (${BUILD_WITH_ROCKSDB})
endif()
endif()
# cos
if(${BUILD_WITH_COS})
if(${TD_LINUX})
set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.1)
#ADD_DEFINITIONS(-DMINIXML_LIBRARY=${CMAKE_BINARY_DIR}/build/lib/libxml.a)
option(ENABLE_TEST "Enable the tests" OFF)
INCLUDE_DIRECTORIES($ENV{HOME}/.cos-local.1/include)
MESSAGE("$ENV{HOME}/.cos-local.1/include")
set(CMAKE_BUILD_TYPE debug)
set(ORIG_CMAKE_PROJECT_NAME ${CMAKE_PROJECT_NAME})
set(CMAKE_PROJECT_NAME cos_c_sdk)
add_subdirectory(cos-c-sdk-v5 EXCLUDE_FROM_ALL)
target_include_directories(
cos_c_sdk
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cos-c-sdk-v5/cos_c_sdk>
)
set(CMAKE_PROJECT_NAME ${ORIG_CMAKE_PROJECT_NAME})
else()
endif(${TD_LINUX})
endif(${BUILD_WITH_COS})
# lucene
# To support build on ubuntu: sudo apt-get install libboost-all-dev
if(${BUILD_WITH_LUCENE})
......
......@@ -3,6 +3,11 @@ if(${BUILD_WITH_ROCKSDB})
add_subdirectory(rocksdb)
endif(${BUILD_WITH_ROCKSDB})
# cos
if(${BUILD_WITH_COS})
add_subdirectory(cos)
endif(${BUILD_WITH_COS})
if(${BUILD_WITH_LUCENE})
add_subdirectory(lucene)
endif(${BUILD_WITH_LUCENE})
......
add_executable(cosTest "")
target_sources(cosTest
PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}/main.c"
)
#find_path(APR_INCLUDE_DIR apr-1/apr_time.h)
#find_path(APR_UTIL_INCLUDE_DIR apr/include/apr-1/apr_md5.h)
#find_path(MINIXML_INCLUDE_DIR mxml.h)
#find_path(CURL_INCLUDE_DIR curl/curl.h)
#include_directories (${MINIXML_INCLUDE_DIR})
#include_directories (${CURL_INCLUDE_DIR})
FIND_PROGRAM(APR_CONFIG_BIN NAMES apr-config apr-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/)
#FIND_PROGRAM(APU_CONFIG_BIN NAMES apu-config apu-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/)
IF (APR_CONFIG_BIN)
EXECUTE_PROCESS(
COMMAND ${APR_CONFIG_BIN} --includedir
OUTPUT_VARIABLE APR_INCLUDE_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)
ENDIF()
#IF (APU_CONFIG_BIN)
# EXECUTE_PROCESS(
# COMMAND ${APU_CONFIG_BIN} --includedir
# OUTPUT_VARIABLE APR_UTIL_INCLUDE_DIR
# OUTPUT_STRIP_TRAILING_WHITESPACE
# )
#ENDIF()
include_directories (${APR_INCLUDE_DIR})
#include_directories (${APR_UTIL_INCLUDE_DIR})
target_include_directories(
cosTest
PUBLIC "${TD_SOURCE_DIR}/contrib/cos-c-sdk-v5/cos_c_sdk"
)
#find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/)
#find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/)
#find_library(MINIXML_LIBRARY mxml)
#find_library(CURL_LIBRARY curl)
target_link_libraries(cosTest cos_c_sdk)
target_link_libraries(cosTest apr-1})
target_link_libraries(cosTest aprutil-1})
target_link_libraries(cosTest mxml)
target_link_libraries(cosTest curl)
此差异已折叠。
......@@ -98,6 +98,16 @@ typedef struct SMTbCursor {
int8_t paused;
} SMTbCursor;
typedef struct SMCtbCursor {
SMeta *pMeta;
void *pCur;
tb_uid_t suid;
void *pKey;
void *pVal;
int kLen;
int vLen;
} SMCtbCursor;
typedef struct SRowBuffPos {
void* pRowBuff;
void* pKey;
......@@ -278,13 +288,15 @@ typedef struct SStoreMeta {
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
int64_t (*getNumOfRowsInMem)(void* pVnode);
/**
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
*/
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
void (*closeCtbCursor)(SMCtbCursor *pCtbCur, int lock);
tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
} SStoreMeta;
typedef struct SStoreMetaReader {
......
......@@ -107,6 +107,7 @@ typedef struct SScanLogicNode {
bool sortPrimaryKey;
bool igLastNull;
bool groupOrderScan;
bool onlyMetaCtbIdx; // for tag scan with no tbname
} SScanLogicNode;
typedef struct SJoinLogicNode {
......@@ -334,7 +335,11 @@ typedef struct SScanPhysiNode {
bool groupOrderScan;
} SScanPhysiNode;
typedef SScanPhysiNode STagScanPhysiNode;
typedef struct STagScanPhysiNode {
SScanPhysiNode scan;
bool onlyMetaCtbIdx; //no tbname, tag index not used.
} STagScanPhysiNode;
typedef SScanPhysiNode SBlockDistScanPhysiNode;
typedef struct SLastRowScanPhysiNode {
......
......@@ -76,7 +76,7 @@ int32_t taosUnLockFile(TdFilePtr pFile);
int32_t taosUmaskFile(int32_t maskVal);
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime);
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *atime);
int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno);
int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime);
bool taosCheckExistFile(const char *pathname);
......
......@@ -16,6 +16,14 @@ ENDIF ()
IF (TD_STORAGE)
ADD_DEFINITIONS(-D_STORAGE)
TARGET_LINK_LIBRARIES(common PRIVATE storage)
IF(${TD_LINUX})
IF(${BUILD_WITH_COS})
add_definitions(-DUSE_COS)
ENDIF(${BUILD_WITH_COS})
ENDIF(${TD_LINUX})
ENDIF ()
target_include_directories(
......
......@@ -237,6 +237,14 @@ int64_t tsCheckpointInterval = 3 * 60 * 60 * 1000;
bool tsFilterScalarMode = false;
int32_t tsKeepTimeOffset = 0; // latency of data migration
char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>";
char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>";
char tsS3AccessKeyId[TSDB_FQDN_LEN] = "<accesskeyid>";
char tsS3AccessKeySecret[TSDB_FQDN_LEN] = "<accesskeysecrect>";
char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>";
char tsS3AppId[TSDB_FQDN_LEN] = "<appid>";
int8_t tsS3Enabled = false;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "dataDir");
......@@ -258,7 +266,43 @@ int32_t taosSetTfsCfg(SConfig *pCfg) {
int32_t taosSetTfsCfg(SConfig *pCfg);
#endif
struct SConfig *taosGetCfg() { return tsCfg; }
int32_t taosSetS3Cfg(SConfig *pCfg) {
tstrncpy(tsS3AccessKey, cfgGetItem(pCfg, "s3Accesskey")->str, TSDB_FQDN_LEN);
if (tsS3AccessKey[0] == '<') {
return 0;
}
char *colon = strchr(tsS3AccessKey, ':');
if (!colon) {
uError("invalid access key:%s", tsS3AccessKey);
return -1;
}
*colon = '\0';
tstrncpy(tsS3AccessKeyId, tsS3AccessKey, TSDB_FQDN_LEN);
tstrncpy(tsS3AccessKeySecret, colon + 1, TSDB_FQDN_LEN);
tstrncpy(tsS3Endpoint, cfgGetItem(pCfg, "s3Endpoint")->str, TSDB_FQDN_LEN);
tstrncpy(tsS3BucketName, cfgGetItem(pCfg, "s3BucketName")->str, TSDB_FQDN_LEN);
char *cos = strstr(tsS3Endpoint, "cos.");
if (cos) {
char *appid = strrchr(tsS3BucketName, '-');
if (!appid) {
uError("failed to locate appid in bucket:%s", tsS3BucketName);
return -1;
} else {
tstrncpy(tsS3AppId, appid + 1, TSDB_FQDN_LEN);
}
}
if (tsS3BucketName[0] != '<' && tsDiskCfgNum > 1) {
#ifdef USE_COS
tsS3Enabled = true;
#endif
}
return 0;
}
struct SConfig *taosGetCfg() {
return tsCfg;
}
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
char *apolloUrl) {
......@@ -581,6 +625,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1;
GRANT_CFG_ADD;
return 0;
}
......@@ -1502,6 +1550,7 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
if (taosSetServerCfg(tsCfg)) return -1;
if (taosSetReleaseCfg(tsCfg)) return -1;
if (taosSetTfsCfg(tsCfg) != 0) return -1;
if (taosSetS3Cfg(tsCfg) != 0) return -1;
}
taosSetSystemCfg(tsCfg);
......
......@@ -46,7 +46,7 @@ static int32_t mmDecodeOption(SJson *pJson, SMnodeOpt *pOption) {
if (code < 0) return -1;
tjsonGetInt32ValueFromDouble(replica, "role", pOption->nodeRoles[i], code);
if (code < 0) return -1;
if(pOption->nodeRoles[i] == TAOS_SYNC_ROLE_VOTER){
if (pOption->nodeRoles[i] == TAOS_SYNC_ROLE_VOTER) {
pOption->numOfReplicas++;
}
}
......@@ -65,7 +65,7 @@ int32_t mmReadFile(const char *path, SMnodeOpt *pOption) {
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json", path, TD_DIRSEP);
if (taosStatFile(file, NULL, NULL) < 0) {
if (taosStatFile(file, NULL, NULL, NULL) < 0) {
dInfo("mnode file:%s not exist", file);
return 0;
}
......
......@@ -97,7 +97,7 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t
SWrapperCfg *pCfgs = NULL;
snprintf(file, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
if (taosStatFile(file, NULL, NULL) < 0) {
if (taosStatFile(file, NULL, NULL, NULL) < 0) {
dInfo("vnode file:%s not exist", file);
return 0;
}
......
......@@ -100,7 +100,7 @@ int32_t dmReadEps(SDnodeData *pData) {
goto _OVER;
}
if (taosStatFile(file, NULL, NULL) < 0) {
if (taosStatFile(file, NULL, NULL, NULL) < 0) {
dInfo("dnode file:%s not exist", file);
code = 0;
goto _OVER;
......@@ -350,7 +350,7 @@ void dmRotateMnodeEpSet(SDnodeData *pData) {
}
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
if(!pData->validMnodeEps) return;
if (!pData->validMnodeEps) return;
dmGetMnodeEpSet(pData, pEpSet);
dTrace("msg is redirected, handle:%p num:%d use:%d", pMsg->info.handle, pEpSet->numOfEps, pEpSet->inUse);
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
......@@ -469,7 +469,7 @@ static int32_t dmReadDnodePairs(SDnodeData *pData) {
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
if (taosStatFile(file, NULL, NULL) < 0) {
if (taosStatFile(file, NULL, NULL, NULL) < 0) {
dDebug("dnode file:%s not exist", file);
code = 0;
goto _OVER;
......
......@@ -38,7 +38,7 @@ int32_t dmReadFile(const char *path, const char *name, bool *pDeployed) {
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
if (taosStatFile(file, NULL, NULL) < 0) {
if (taosStatFile(file, NULL, NULL, NULL) < 0) {
dInfo("file:%s not exist", file);
code = 0;
goto _OVER;
......
......@@ -8,6 +8,7 @@ set(
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeQuery.c"
"src/vnd/vnodeModule.c"
"src/vnd/vnodeCos.c"
"src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c"
......@@ -155,6 +156,45 @@ target_link_libraries(
PUBLIC index
)
if(${TD_LINUX})
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/)
find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/)
find_library(MINIXML_LIBRARY mxml)
find_library(CURL_LIBRARY curl)
target_link_libraries(
vnode
# s3
PUBLIC cos_c_sdk_static
PUBLIC ${APR_UTIL_LIBRARY}
PUBLIC ${APR_LIBRARY}
PUBLIC ${MINIXML_LIBRARY}
PUBLIC ${CURL_LIBRARY}
)
# s3
FIND_PROGRAM(APR_CONFIG_BIN NAMES apr-config apr-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/)
IF (APR_CONFIG_BIN)
EXECUTE_PROCESS(
COMMAND ${APR_CONFIG_BIN} --includedir
OUTPUT_VARIABLE APR_INCLUDE_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)
ENDIF()
include_directories (${APR_INCLUDE_DIR})
target_include_directories(
vnode
PUBLIC "${TD_SOURCE_DIR}/contrib/cos-c-sdk-v5/cos_c_sdk"
PUBLIC "$ENV{HOME}/.cos-local.1/include"
)
if(${BUILD_WITH_COS})
add_definitions(-DUSE_COS)
endif(${BUILD_WITH_COS})
endif(${TD_LINUX})
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(vnode PUBLIC grant)
ENDIF ()
......@@ -169,8 +209,6 @@ if(${BUILD_WITH_ROCKSDB})
add_definitions(-DUSE_ROCKSDB)
endif(${BUILD_WITH_ROCKSDB})
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VND_COS_H_
#define _TD_VND_COS_H_
#include "vnd.h"
#ifdef __cplusplus
extern "C" {
#endif
extern int8_t tsS3Enabled;
int32_t s3Init();
void s3CleanUp();
int32_t s3PutObjectFromFile(const char *file, const char *object);
void s3DeleteObjects(const char *object_name[], int nobject);
bool s3Exists(const char *object_name);
bool s3Get(const char *object_name, const char *path);
void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VND_COS_H_*/
......@@ -167,7 +167,7 @@ int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock);
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
......
......@@ -408,17 +408,9 @@ _err:
return NULL;
}
struct SMCtbCursor {
SMeta *pMeta;
TBC *pCur;
tb_uid_t suid;
void *pKey;
void *pVal;
int kLen;
int vLen;
};
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid, int lock) {
SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
SMCtbCursor *pCtbCur = NULL;
SCtbIdxKey ctbIdxKey;
int ret = 0;
......@@ -435,7 +427,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid, int lock) {
metaRLock(pMeta);
}
ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
ret = tdbTbcOpen(pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL);
if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pCtbCur);
......@@ -1373,7 +1365,7 @@ int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) {
}
int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) {
SMCtbCursor *pCur = metaOpenCtbCursor(((SVnode *)pVnode)->pMeta, suid, 1);
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, suid, 1);
// If len > 0 means there already have uids, and we only want the
// tags of the specified tables, of which uid in the uid list. Otherwise, all table tags are retrieved and kept
......
......@@ -60,7 +60,7 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) {
}
int64_t sz = 0;
if (taosStatFile(fname, &sz, NULL) < 0) {
if (taosStatFile(fname, &sz, NULL, NULL) < 0) {
taosCloseFile(&pFile);
taosMemoryFree(fname);
return -1;
......
......@@ -176,7 +176,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
// SDelFile
if (pTsdb->fs.pDelFile) {
tsdbDelFileName(pTsdb, pTsdb->fs.pDelFile, fname);
if (taosStatFile(fname, &size, NULL)) {
if (taosStatFile(fname, &size, NULL, NULL)) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -195,7 +195,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
// head =========
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
if (taosStatFile(fname, &size, NULL)) {
if (taosStatFile(fname, &size, NULL, NULL)) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -206,7 +206,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
// data =========
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
if (taosStatFile(fname, &size, NULL)) {
if (taosStatFile(fname, &size, NULL, NULL)) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -221,7 +221,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
// sma =============
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
if (taosStatFile(fname, &size, NULL)) {
if (taosStatFile(fname, &size, NULL, NULL)) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -237,7 +237,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
// stt ===========
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
if (taosStatFile(fname, &size, NULL)) {
if (taosStatFile(fname, &size, NULL, NULL)) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
......
......@@ -14,6 +14,7 @@
*/
#include "tsdb.h"
#include "vndCos.h"
// =============== PAGE-WISE FILE ===============
int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
......@@ -34,10 +35,25 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p
pFD->flag = flag;
pFD->pFD = taosOpenFile(path, flag);
if (pFD->pFD == NULL) {
const char *object_name = taosDirEntryBaseName((char *)path);
long s3_size = s3Size(object_name);
if (!strncmp(path + strlen(path) - 5, ".data", 5) && s3_size > 0) {
s3EvictCache(path, s3_size);
s3Get(object_name, path);
pFD->pFD = taosOpenFile(path, flag);
if (pFD->pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pFD);
goto _exit;
}
} else {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pFD);
goto _exit;
}
}
pFD->szPage = szPage;
pFD->pgno = 0;
pFD->pBuf = taosMemoryCalloc(1, szPage);
......@@ -50,7 +66,7 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p
// not check file size when reading data files.
if (flag != TD_FILE_READ) {
if (taosStatFile(path, &pFD->szFile, NULL) < 0) {
if (taosStatFile(path, &pFD->szFile, NULL, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pFD->pBuf);
taosCloseFile(&pFD->pFD);
......
......@@ -15,6 +15,7 @@
#include "tsdb.h"
#include "tsdbFS2.h"
#include "vndCos.h"
typedef struct {
STsdb *tsdb;
......@@ -41,6 +42,28 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
return TARRAY2_APPEND(rtner->fopArr, op);
}
static int32_t tsdbRemoveFileObjectS3(SRTNer *rtner, const STFileObj *fobj) {
int32_t code = 0, lino = 0;
STFileOp op = {
.optype = TSDB_FOP_REMOVE,
.fid = fobj->f->fid,
.of = fobj->f[0],
};
code = TARRAY2_APPEND(rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
s3DeleteObjects(&object_name, 1);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) {
int32_t code = 0;
int32_t lino = 0;
......@@ -76,6 +99,34 @@ _exit:
return code;
}
static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile *to) {
int32_t code = 0;
int32_t lino = 0;
char fname[TSDB_FILENAME_LEN];
TdFilePtr fdFrom = NULL;
TdFilePtr fdTo = NULL;
tsdbTFileName(rtner->tsdb, to, fname);
fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
if (fdFrom == NULL) code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
char *object_name = taosDirEntryBaseName(fname);
code = s3PutObjectFromFile(from->fname, object_name);
TSDB_CHECK_CODE(code, lino, _exit);
taosCloseFile(&fdFrom);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
taosCloseFile(&fdFrom);
}
return code;
}
static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
int32_t code = 0;
int32_t lino = 0;
......@@ -123,6 +174,53 @@ _exit:
return code;
}
static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
int32_t code = 0;
int32_t lino = 0;
STFileOp op = {0};
// remove old
op = (STFileOp){
.optype = TSDB_FOP_REMOVE,
.fid = fobj->f->fid,
.of = fobj->f[0],
};
code = TARRAY2_APPEND(rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
// create new
op = (STFileOp){
.optype = TSDB_FOP_CREATE,
.fid = fobj->f->fid,
.nf =
{
.type = fobj->f->type,
.did = did[0],
.fid = fobj->f->fid,
.cid = fobj->f->cid,
.size = fobj->f->size,
.stt[0] =
{
.level = fobj->f->stt[0].level,
},
},
};
code = TARRAY2_APPEND(rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
// do copy the file
code = tsdbCopyFileS3(rtner, fobj, &op.nf);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
}
return code;
}
typedef struct {
STsdb *tsdb;
int32_t sync;
......@@ -201,9 +299,15 @@ static int32_t tsdbDoRetention2(void *arg) {
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) {
code = tsdbRemoveFileObjectS3(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
SSttLvl *lvl;
TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) {
......@@ -228,9 +332,16 @@ static int32_t tsdbDoRetention2(void *arg) {
if (fobj == NULL) continue;
if (fobj->f->did.level == did.level) continue;
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) {
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// stt
SSttLvl *lvl;
......
......@@ -76,7 +76,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
int32_t code = 0;
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1;
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
int32_t size = taosArrayGetSize(pMsg->aSubmitTbData);
......
#define ALLOW_FORBID_FUNC
#include "vndCos.h"
extern char tsS3Endpoint[];
extern char tsS3AccessKeyId[];
extern char tsS3AccessKeySecret[];
extern char tsS3BucketName[];
extern char tsS3AppId[];
#ifdef USE_COS
#include "cos_api.h"
#include "cos_http_io.h"
#include "cos_log.h"
int32_t s3Init() {
if (cos_http_io_initialize(NULL, 0) != COSE_OK) {
return -1;
}
// set log level, default COS_LOG_WARN
cos_log_set_level(COS_LOG_WARN);
// set log output, default stderr
cos_log_set_output(NULL);
return 0;
}
void s3CleanUp() { cos_http_io_deinitialize(); }
static void log_status(cos_status_t *s) {
cos_warn_log("status->code: %d", s->code);
if (s->error_code) cos_warn_log("status->error_code: %s", s->error_code);
if (s->error_msg) cos_warn_log("status->error_msg: %s", s->error_msg);
if (s->req_id) cos_warn_log("status->req_id: %s", s->req_id);
}
static void s3InitRequestOptions(cos_request_options_t *options, int is_cname) {
options->config = cos_config_create(options->pool);
cos_config_t *config = options->config;
cos_str_set(&config->endpoint, tsS3Endpoint);
cos_str_set(&config->access_key_id, tsS3AccessKeyId);
cos_str_set(&config->access_key_secret, tsS3AccessKeySecret);
cos_str_set(&config->appid, tsS3AppId);
config->is_cname = is_cname;
options->ctl = cos_http_controller_create(options->pool, 0);
}
int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
int32_t code = 0;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket, object, file;
cos_table_t *resp_headers;
int traffic_limit = 0;
cos_pool_create(&p, NULL);
options = cos_request_options_create(p);
s3InitRequestOptions(options, is_cname);
cos_table_t *headers = NULL;
if (traffic_limit) {
// 限速值设置范围为819200 - 838860800,即100KB/s - 100MB/s,如果超出该范围将返回400错误
headers = cos_table_make(p, 1);
cos_table_add_int(headers, "x-cos-traffic-limit", 819200);
}
cos_str_set(&bucket, tsS3BucketName);
cos_str_set(&file, file_str);
cos_str_set(&object, object_str);
s = cos_put_object_from_file(options, &bucket, &object, &file, headers, &resp_headers);
log_status(s);
cos_pool_destroy(p);
if (s->code != 200) {
return code = s->code;
}
return code;
}
void s3DeleteObjects(const char *object_name[], int nobject) {
cos_pool_t *p = NULL;
int is_cname = 0;
cos_string_t bucket;
cos_table_t *resp_headers = NULL;
cos_request_options_t *options = NULL;
cos_list_t object_list;
cos_list_t deleted_object_list;
int is_quiet = COS_TRUE;
cos_pool_create(&p, NULL);
options = cos_request_options_create(p);
s3InitRequestOptions(options, is_cname);
cos_str_set(&bucket, tsS3BucketName);
cos_list_init(&object_list);
cos_list_init(&deleted_object_list);
for (int i = 0; i < nobject; ++i) {
cos_object_key_t *content = cos_create_cos_object_key(p);
cos_str_set(&content->key, object_name[i]);
cos_list_add_tail(&content->node, &object_list);
}
cos_status_t *s = cos_delete_objects(options, &bucket, &object_list, is_quiet, &resp_headers, &deleted_object_list);
log_status(s);
cos_pool_destroy(p);
if (cos_status_is_ok(s)) {
cos_warn_log("delete objects succeeded\n");
} else {
cos_warn_log("delete objects failed\n");
}
}
bool s3Exists(const char *object_name) {
bool ret = false;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_table_t *resp_headers;
cos_table_t *headers = NULL;
cos_object_exist_status_e object_exist;
cos_pool_create(&p, NULL);
options = cos_request_options_create(p);
s3InitRequestOptions(options, is_cname);
cos_str_set(&bucket, tsS3BucketName);
cos_str_set(&object, object_name);
s = cos_check_object_exist(options, &bucket, &object, headers, &object_exist, &resp_headers);
if (object_exist == COS_OBJECT_NON_EXIST) {
cos_warn_log("object: %.*s non exist.\n", object.len, object.data);
} else if (object_exist == COS_OBJECT_EXIST) {
ret = true;
cos_warn_log("object: %.*s exist.\n", object.len, object.data);
} else {
cos_warn_log("object: %.*s unknown status.\n", object.len, object.data);
log_status(s);
}
cos_pool_destroy(p);
return ret;
}
bool s3Get(const char *object_name, const char *path) {
bool ret = false;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_string_t file;
cos_table_t *resp_headers = NULL;
cos_table_t *headers = NULL;
int traffic_limit = 0;
//创建内存池
cos_pool_create(&p, NULL);
//初始化请求选项
options = cos_request_options_create(p);
s3InitRequestOptions(options, is_cname);
cos_str_set(&bucket, tsS3BucketName);
if (traffic_limit) {
//限速值设置范围为819200 - 838860800,即100KB/s - 100MB/s,如果超出该范围将返回400错误
headers = cos_table_make(p, 1);
cos_table_add_int(headers, "x-cos-traffic-limit", 819200);
}
//下载对象
cos_str_set(&file, path);
cos_str_set(&object, object_name);
s = cos_get_object_to_file(options, &bucket, &object, headers, NULL, &file, &resp_headers);
if (cos_status_is_ok(s)) {
ret = true;
cos_warn_log("get object succeeded\n");
} else {
cos_warn_log("get object failed\n");
}
//销毁内存池
cos_pool_destroy(p);
return ret;
}
typedef struct {
int64_t size;
int32_t atime;
char name[TSDB_FILENAME_LEN];
} SEvictFile;
static int32_t evictFileCompareAsce(const void *pLeft, const void *pRight) {
SEvictFile *lhs = (SEvictFile *)pLeft;
SEvictFile *rhs = (SEvictFile *)pRight;
return lhs->atime < rhs->atime ? -1 : 1;
}
void s3EvictCache(const char *path, long object_size) {
SDiskSize disk_size = {0};
char dir_name[TSDB_FILENAME_LEN] = "\0";
tstrncpy(dir_name, path, TSDB_FILENAME_LEN);
taosDirName(dir_name);
if (taosGetDiskSize((char *)dir_name, &disk_size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
vError("failed to get disk:%s size since %s", path, terrstr());
return;
}
if (object_size >= disk_size.avail - (1 << 30)) {
// evict too old files
// 1, list data files' atime under dir(path)
tdbDirPtr pDir = taosOpenDir(dir_name);
if (pDir == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
vError("failed to open %s since %s", dir_name, terrstr());
}
SArray *evict_files = taosArrayInit(16, sizeof(SEvictFile));
tdbDirEntryPtr pDirEntry;
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
char *name = taosGetDirEntryName(pDirEntry);
if (!strncmp(name + strlen(name) - 5, ".data", 5)) {
SEvictFile e_file = {0};
char entry_name[TSDB_FILENAME_LEN] = "\0";
int dir_len = strlen(dir_name);
memcpy(e_file.name, dir_name, dir_len);
e_file.name[dir_len] = '/';
memcpy(e_file.name + dir_len + 1, name, strlen(name));
taosStatFile(e_file.name, &e_file.size, NULL, &e_file.atime);
taosArrayPush(evict_files, &e_file);
}
}
taosCloseDir(&pDir);
// 2, sort by atime
taosArraySort(evict_files, evictFileCompareAsce);
// 3, remove files ascendingly until we get enough object_size space
long evict_size = 0;
size_t ef_size = TARRAY_SIZE(evict_files);
for (size_t i = 0; i < ef_size; ++i) {
SEvictFile *evict_file = taosArrayGet(evict_files, i);
taosRemoveFile(evict_file->name);
evict_size += evict_file->size;
if (evict_size >= object_size) {
break;
}
}
taosArrayDestroy(evict_files);
}
}
long s3Size(const char *object_name) {
long size = 0;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_table_t *resp_headers = NULL;
//创建内存池
cos_pool_create(&p, NULL);
//初始化请求选项
options = cos_request_options_create(p);
s3InitRequestOptions(options, is_cname);
cos_str_set(&bucket, tsS3BucketName);
//获取对象元数据
cos_str_set(&object, object_name);
s = cos_head_object(options, &bucket, &object, NULL, &resp_headers);
// print_headers(resp_headers);
if (cos_status_is_ok(s)) {
char *content_length_str = (char *)apr_table_get(resp_headers, COS_CONTENT_LENGTH);
if (content_length_str != NULL) {
size = atol(content_length_str);
}
cos_warn_log("head object succeeded: %ld\n", size);
} else {
cos_warn_log("head object failed\n");
}
//销毁内存池
cos_pool_destroy(p);
return size;
}
#else
int32_t s3Init() { return 0; }
void s3CleanUp() {}
int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; }
void s3DeleteObjects(const char *object_name[], int nobject) {}
bool s3Exists(const char *object_name) { return false; }
bool s3Get(const char *object_name, const char *path) { return false; }
void s3EvictCache(const char *path, long object_size) {}
long s3Size(const char *object_name) { return 0; }
#endif
......@@ -96,6 +96,10 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->metaGetCachedTbGroup = metaGetCachedTbGroup;
pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache;
pMeta->openCtbCursor = metaOpenCtbCursor;
pMeta->closeCtbCursor = metaCloseCtbCursor;
pMeta->ctbCursorNext = metaCtbCursorNext;
}
void initTqAPI(SStoreTqReader* pTq) {
......
......@@ -14,6 +14,7 @@
*/
#include "vnd.h"
#include "vndCos.h"
typedef struct SVnodeTask SVnodeTask;
struct SVnodeTask {
......@@ -81,6 +82,9 @@ int vnodeInit(int nthreads) {
if (tqInit() < 0) {
return -1;
}
if (s3Init() < 0) {
return -1;
}
return 0;
}
......@@ -112,6 +116,7 @@ void vnodeCleanup() {
walCleanUp();
tqCleanUp();
smaCleanUp();
s3CleanUp();
}
int vnodeScheduleTaskEx(int tpid, int (*execute)(void*), void* arg) {
......
......@@ -440,7 +440,7 @@ int32_t vnodeGetTableList(void* pVnode, int8_t type, SArray* pList) {
}
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, uid, 1);
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, uid, 1);
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
......@@ -462,7 +462,7 @@ int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bo
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) {
SVnode *pVnodeObj = pVnode;
SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj->pMeta, suid, 1);
SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj, suid, 1);
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
......@@ -521,7 +521,7 @@ int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bo
}
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid, 0);
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, suid, 0);
if (!pCur) {
return TSDB_CODE_FAILED;
}
......
......@@ -291,17 +291,17 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
switch (pNode->type) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: {
STagScanPhysiNode *pTagScanNode = (STagScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname);
EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->scan.tableName.tname);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
if (pTagScanNode->pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pTagScanNode->pScanPseudoCols->length);
if (pTagScanNode->scan.pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pTagScanNode->scan.pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......@@ -309,11 +309,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pTagScanNode->node.pOutputDataBlockDesc->pSlots));
nodesGetOutputNumFromSlotList(pTagScanNode->scan.node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pTagScanNode->node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pTagScanNode->node.pSlimit);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->scan.node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pTagScanNode->scan.node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pTagScanNode->scan.node.pSlimit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
......
......@@ -194,4 +194,6 @@ void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, b
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols);
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta);
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI);
#endif // TDENGINE_EXECUTIL_H
......@@ -251,6 +251,12 @@ typedef struct STableMergeScanInfo {
SSortExecInfo sortExecInfo;
} STableMergeScanInfo;
typedef struct STagScanFilterContext {
SHashObj* colHash;
int32_t index;
SArray* cInfoList;
} STagScanFilterContext;
typedef struct STagScanInfo {
SColumnInfo* pCols;
SSDataBlock* pRes;
......@@ -259,6 +265,14 @@ typedef struct STagScanInfo {
SLimitNode* pSlimit;
SReadHandle readHandle;
STableListInfo* pTableListInfo;
uint64_t suid;
void* pCtbCursor;
SNode* pTagCond;
SNode* pTagIndexCond;
STagScanFilterContext filterCtx;
SArray* aUidTags; // SArray<STUidTagInfo>
SArray* aFilterIdxs; // SArray<int32_t>
SStorageAPI* pStorageAPI;
} STagScanInfo;
typedef enum EStreamScanMode {
......
......@@ -81,7 +81,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SNode* pTagCond, SNode*pTagIndexCond, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
......
......@@ -47,8 +47,6 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* p
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI);
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
......@@ -846,7 +844,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S
return -1;
}
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI) {
SSDataBlock* pResBlock = createDataBlock();
if (pResBlock == NULL) {
......
......@@ -370,17 +370,18 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
if (!pTagScanPhyNode->onlyMetaCtbIdx) {
int32_t code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("failed to getTableList, code: %s", tstrerror(code));
return NULL;
}
pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
}
pOperator = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
......
......@@ -2654,7 +2654,236 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
}
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
static void tagScanFreeUidTag(void* p) {
STUidTagInfo* pInfo = p;
if (pInfo->pTagVal != NULL) {
taosMemoryFree(pInfo->pTagVal);
}
}
static int32_t tagScanCreateResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
if (pColumnData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pColumnData->info.type = pType->type;
pColumnData->info.bytes = pType->bytes;
pColumnData->info.scale = pType->scale;
pColumnData->info.precision = pType->precision;
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pColumnData);
return terrno;
}
pParam->columnData = pColumnData;
pParam->colAlloced = true;
return TSDB_CODE_SUCCESS;
}
static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) {
SColumnNode* pSColumnNode = NULL;
if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
pSColumnNode = *(SColumnNode**)pNode;
} else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
pSColumnNode = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pSColumnNode) {
return DEAL_RES_ERROR;
}
pSColumnNode->colId = -1;
pSColumnNode->colType = COLUMN_TYPE_TBNAME;
pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
nodesDestroyNode(*pNode);
*pNode = (SNode*)pSColumnNode;
} else {
return DEAL_RES_CONTINUE;
}
} else {
return DEAL_RES_CONTINUE;
}
STagScanFilterContext* pCtx = (STagScanFilterContext*)pContext;
void* data = taosHashGet(pCtx->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
if (!data) {
taosHashPut(pCtx->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
pSColumnNode->slotId = pCtx->index++;
SColumnInfo cInfo = {.colId = pSColumnNode->colId,
.type = pSColumnNode->node.resType.type,
.bytes = pSColumnNode->node.resType.bytes};
taosArrayPush(pCtx->cInfoList, &cInfo);
} else {
SColumnNode* col = *(SColumnNode**)data;
pSColumnNode->slotId = col->slotId;
}
return DEAL_RES_CONTINUE;
}
static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI, STagScanInfo* pInfo) {
int32_t code = 0;
int32_t numOfTables = taosArrayGetSize(aUidTags);
SSDataBlock* pResBlock = createTagValBlockForFilter(pInfo->filterCtx.cInfoList, numOfTables, aUidTags, pVnode, pAPI);
SArray* pBlockList = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(pBlockList, &pResBlock);
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
SScalarParam output = {0};
tagScanCreateResultData(&type, numOfTables, &output);
scalarCalculate(pTagCond, pBlockList, &output);
bool* result = (bool*)output.columnData->pData;
for (int32_t i = 0 ; i < numOfTables; ++i) {
if (result[i]) {
taosArrayPush(aFilterIdxs, &i);
}
}
colDataDestroy(output.columnData);
taosMemoryFreeClear(output.columnData);
blockDataDestroy(pResBlock);
taosArrayDestroy(pBlockList);
}
static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) {
if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
// if (pUidTagInfo->name != NULL) {
// STR_TO_VARSTR(str, pUidTagInfo->name);
// } else { // name is not retrieved during filter
// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str);
// }
STR_TO_VARSTR(str, "ctbidx");
colDataSetVal(pColInfo, rowIndex, str, false);
} else {
STagVal tagVal = {0};
tagVal.cid = pExprInfo->base.pParam[0].pCol->colId;
if (pUidTagInfo->pTagVal == NULL) {
colDataSetNULL(pColInfo, rowIndex);
} else {
const char* p = pAPI->metaFn.extractTagVal(pUidTagInfo->pTagVal, pColInfo->info.type, &tagVal);
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
colDataSetNULL(pColInfo, rowIndex);
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
colDataSetVal(pColInfo, rowIndex, p, false);
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
varDataSetLen(tmp, tagVal.nData);
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
colDataSetVal(pColInfo, rowIndex, tmp, false);
taosMemoryFree(tmp);
} else {
colDataSetVal(pColInfo, rowIndex, (const char*)&tagVal.i64, false);
}
}
}
}
static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aFilterIdxs, bool ignoreFilterIdx,
SStorageAPI* pAPI) {
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
if (!ignoreFilterIdx) {
size_t szTables = taosArrayGetSize(aFilterIdxs);
for (int i = 0; i < szTables; ++i) {
int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i);
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx);
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
}
}
} else {
size_t szTables = taosArrayGetSize(aUidTags);
for (int i = 0; i < szTables; ++i) {
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, i);
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
}
}
}
return 0;
}
static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
STagScanInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
if (pInfo->pCtbCursor == NULL) {
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
}
SArray* aUidTags = pInfo->aUidTags;
SArray* aFilterIdxs = pInfo->aFilterIdxs;
int32_t count = 0;
while (1) {
taosArrayClearEx(aUidTags, tagScanFreeUidTag);
taosArrayClear(aFilterIdxs);
int32_t numTables = 0;
while (numTables < pOperator->resultInfo.capacity) {
SMCtbCursor* pCur = pInfo->pCtbCursor;
tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor);
if (uid == 0) {
break;
}
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
info.pTagVal = taosMemoryMalloc(pCur->vLen);
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
taosArrayPush(aUidTags, &info);
++numTables;
}
if (numTables == 0) {
break;
}
bool ignoreFilterIdx = true;
if (pInfo->pTagCond != NULL) {
ignoreFilterIdx = false;
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI, pInfo);
} else {
ignoreFilterIdx = true;
}
tagScanFillResultBlock(pOperator, pRes, aUidTags, aFilterIdxs, ignoreFilterIdx, pAPI);
count = ignoreFilterIdx ? taosArrayGetSize(aUidTags): taosArrayGetSize(aFilterIdxs);
if (count != 0) {
break;
}
}
pRes->info.rows = count;
pOperator->resultInfo.totalRows += count;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -2712,14 +2941,23 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
static void destroyTagScanOperatorInfo(void* param) {
STagScanInfo* pInfo = (STagScanInfo*)param;
if (pInfo->pCtbCursor != NULL) {
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1);
}
taosHashCleanup(pInfo->filterCtx.colHash);
taosArrayDestroy(pInfo->filterCtx.cInfoList);
taosArrayDestroy(pInfo->aFilterIdxs);
taosArrayDestroyEx(pInfo->aUidTags, tagScanFreeUidTag);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosArrayDestroy(pInfo->matchInfo.pList);
pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
taosMemoryFreeClear(param);
}
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo) {
SScanPhysiNode* pPhyNode = (SScanPhysiNode*)pTagScanNode;
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -2741,6 +2979,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto _error;
}
pInfo->pTagCond = pTagCond;
pInfo->pTagIndexCond = pTagIndexCond;
pInfo->suid = pPhyNode->suid;
pInfo->pStorageAPI = &pTaskInfo->storageAPI;
pInfo->pTableListInfo = pTableListInfo;
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->readHandle = *pReadHandle;
......@@ -2752,8 +2995,18 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
if (pTagScanNode->onlyMetaCtbIdx) {
pInfo->aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo));
pInfo->aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t));
pInfo->filterCtx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
pInfo->filterCtx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
if (pInfo->pTagCond != NULL) {
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&pInfo->filterCtx);
}
}
__optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
return pOperator;
......
......@@ -848,7 +848,7 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
char path[PATH_MAX] = {0};
udfdGetFuncBodyPath(udf, path);
bool fileExist = !(taosStatFile(path, NULL, NULL) < 0);
bool fileExist = !(taosStatFile(path, NULL, NULL, NULL) < 0);
if (fileExist) {
strncpy(udf->path, path, PATH_MAX);
fnInfo("udfd func body file. reuse existing file %s", path);
......
......@@ -162,7 +162,7 @@ static FORCE_INLINE int idxFileCtxGetSize(IFileCtx* ctx) {
return ctx->offset;
} else {
int64_t file_size = 0;
taosStatFile(ctx->file.buf, &file_size, NULL);
taosStatFile(ctx->file.buf, &file_size, NULL, NULL);
return (int)file_size;
}
}
......@@ -199,7 +199,7 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
code = taosFtruncateFile(ctx->file.pFile, 0);
UNUSED(code);
code = taosStatFile(path, &ctx->file.size, NULL);
code = taosStatFile(path, &ctx->file.size, NULL, NULL);
UNUSED(code);
ctx->file.wBufOffset = 0;
......
......@@ -564,7 +564,9 @@ static int32_t physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) {
}
static int32_t physiTagScanCopy(const STagScanPhysiNode* pSrc, STagScanPhysiNode* pDst) {
return physiScanCopy(pSrc, pDst);
COPY_BASE_OBJECT_FIELD(scan, physiScanCopy);
COPY_SCALAR_FIELD(onlyMetaCtbIdx);
return TSDB_CODE_SUCCESS;
}
static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhysiNode* pDst) {
......
......@@ -662,6 +662,7 @@ static const char* jkScanLogicPlanDynamicScanFuncs = "DynamicScanFuncs";
static const char* jkScanLogicPlanDataRequired = "DataRequired";
static const char* jkScanLogicPlanTagCond = "TagCond";
static const char* jkScanLogicPlanGroupTags = "GroupTags";
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
......@@ -703,7 +704,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkScanLogicPlanGroupTags, pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx);
}
return code;
}
......@@ -748,6 +751,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkScanLogicPlanGroupTags, &pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanLogicPlanOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx);
}
return code;
}
......@@ -1564,7 +1570,7 @@ static const char* jkScanPhysiPlanTableName = "TableName";
static const char* jkScanPhysiPlanGroupOrderScan = "GroupOrderScan";
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
const SScanPhysiNode* pNode = (const SScanPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
......@@ -1593,7 +1599,7 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
}
static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj;
SScanPhysiNode* pNode = (SScanPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
......@@ -1621,6 +1627,30 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkTagScanPhysiOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) {
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
int32_t code = physiScanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTagScanPhysiOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx);
}
return code;
}
static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) {
STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj;
int32_t code = jsonToPhysiScanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTagScanPhysiOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx);
}
return code;
}
static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags";
static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort";
......@@ -6592,6 +6622,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_LOGIC_PLAN:
return logicPlanToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return physiTagScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
return physiScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
......@@ -6910,6 +6941,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_LOGIC_PLAN:
return jsonToLogicPlan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return jsonToPhysiTagScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
return jsonToPhysiScanNode(pJson, pObj);
......
......@@ -2003,6 +2003,43 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum {
PHY_TAG_SCAN_CODE_SCAN = 1,
PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX
};
static int32_t physiTagScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN, physiScanNodeToMsg, &pNode->scan);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX, pNode->onlyMetaCtbIdx);
}
return code;
}
static int32_t msgToPhysiTagScanNode(STlvDecoder* pDecoder, void* pObj) {
STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case PHY_TAG_SCAN_CODE_SCAN:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiScanNode, &pNode->scan);
break;
case PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX:
code = tlvDecodeBool(pTlv, &pNode->onlyMetaCtbIdx);
break;
default:
break;
}
}
return code;
}
enum {
PHY_LAST_ROW_SCAN_CODE_SCAN = 1,
PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS,
......@@ -3726,6 +3763,8 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
code = caseWhenNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
code = physiTagScanNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
code = physiScanNodeToMsg(pObj, pEncoder);
break;
......@@ -3869,6 +3908,8 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
code = msgToCaseWhenNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
code = msgToPhysiTagScanNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
code = msgToPhysiScanNode(pDecoder, pObj);
break;
......
......@@ -199,9 +199,10 @@ TEST_F(NodesCloneTest, physiScan) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
STagScanPhysiNode* pSrcNode = (STagScanPhysiNode*)pSrc;
STagScanPhysiNode* pDstNode = (STagScanPhysiNode*)pDst;
ASSERT_EQ(pSrcNode->uid, pDstNode->uid);
ASSERT_EQ(pSrcNode->suid, pDstNode->suid);
ASSERT_EQ(pSrcNode->tableType, pDstNode->tableType);
ASSERT_EQ(pSrcNode->scan.uid, pDstNode->scan.uid);
ASSERT_EQ(pSrcNode->scan.suid, pDstNode->scan.suid);
ASSERT_EQ(pSrcNode->scan.tableType, pDstNode->scan.tableType);
ASSERT_EQ(pSrcNode->onlyMetaCtbIdx, pDstNode->onlyMetaCtbIdx);
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
......
......@@ -2679,6 +2679,9 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
}
nodesDestroyNode((SNode*)pAgg);
tagScanOptCloneAncestorSlimit((SLogicNode*)pScanNode);
pScanNode->onlyMetaCtbIdx = false;
pCxt->optimized = true;
return TSDB_CODE_SUCCESS;
}
......
......@@ -511,6 +511,20 @@ static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSub
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
}
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
SPhysiNode** pPhyNode) {
STagScanPhysiNode* pScan =
(STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx;
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
}
static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
SPhysiNode** pPhyNode) {
SLastRowScanPhysiNode* pScan =
......@@ -646,6 +660,7 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
pCxt->hasScan = true;
switch (pScanLogicNode->scanType) {
case SCAN_TYPE_TAG:
return createTagScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
case SCAN_TYPE_BLOCK_INFO:
return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
case SCAN_TYPE_TABLE_COUNT:
......
......@@ -42,7 +42,7 @@ int32_t raftStoreReadFile(SSyncNode *pNode) {
const char *file = pNode->raftStorePath;
SRaftStore *pStore = &pNode->raftStore;
if (taosStatFile(file, NULL, NULL) < 0) {
if (taosStatFile(file, NULL, NULL, NULL) < 0) {
sInfo("vgId:%d, raft store file:%s not exist, use default value", pNode->vgId, file);
pStore->currentTerm = 0;
pStore->voteFor.addr = 0;
......
此差异已折叠。
......@@ -191,7 +191,7 @@ int32_t taosRenameFile(const char *oldName, const char *newName) {
#endif
}
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) {
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *atime) {
#ifdef WINDOWS
struct _stati64 fileStat;
int32_t code = _stati64(path, &fileStat);
......@@ -211,6 +211,10 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) {
*mtime = fileStat.st_mtime;
}
if (atime != NULL) {
*atime = fileStat.st_mtime;
}
return 0;
}
int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
......
此差异已折叠。
......@@ -60,6 +60,7 @@ docker run \
-v /root/.cargo/git:/root/.cargo/git \
-v /root/go/pkg/mod:/root/go/pkg/mod \
-v /root/.cache/go-build:/root/.cache/go-build \
-v /root/.cos-local.1:/root/.cos-local.1 \
-v ${REP_REAL_PATH}/enterprise/src/plugins/taosx/target:${REP_DIR}/enterprise/src/plugins/taosx/target \
-v ${REP_REAL_PATH}/community/tools/taosws-rs/target:${REP_DIR}/community/tools/taosws-rs/target \
-v ${REP_REAL_PATH}/community/contrib/cJson/:${REP_DIR}/community/contrib/cJson \
......@@ -88,6 +89,7 @@ docker run \
-v /root/.cargo/git:/root/.cargo/git \
-v /root/go/pkg/mod:/root/go/pkg/mod \
-v /root/.cache/go-build:/root/.cache/go-build \
-v /root/.cos-local.1:/root/.cos-local.1 \
-v ${REP_REAL_PATH}/enterprise/src/plugins/taosx/target:${REP_DIR}/enterprise/src/plugins/taosx/target \
-v ${REP_REAL_PATH}/community/tools/taosws-rs/target:${REP_DIR}/community/tools/taosws-rs/target \
-v ${REP_REAL_PATH}/community/contrib/cJson/:${REP_DIR}/community/contrib/cJson \
......
此差异已折叠。
......@@ -221,7 +221,7 @@ int64_t getDirectorySize(char* dir) {
totalSize += subDirSize;
} else if (0 == strcmp(strchr(fileName, '.'), ".log")) { // only calc .log file size, and not include .idx file
int64_t file_size = 0;
taosStatFile(subdir, &file_size, NULL);
taosStatFile(subdir, &file_size, NULL, NULL);
totalSize += file_size;
}
}
......@@ -702,4 +702,3 @@ int main(int32_t argc, char* argv[]) {
taosCloseFile(&g_fp);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册