提交 d7a525f2 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/3.0' into feature/3.0_debug_wxy

......@@ -4,9 +4,6 @@
[submodule "src/connector/hivemq-tdengine-extension"]
path = src/connector/hivemq-tdengine-extension
url = git@github.com:taosdata/hivemq-tdengine-extension.git
[submodule "deps/jemalloc"]
path = deps/jemalloc
url = https://github.com/jemalloc/jemalloc
[submodule "deps/TSZ"]
path = deps/TSZ
url = https://github.com/taosdata/TSZ.git
......
......@@ -84,6 +84,12 @@ ELSE ()
ENDIF ()
ENDIF ()
option(
JEMALLOC_ENABLED
"If build with jemalloc"
OFF
)
option(
BUILD_SANITIZER
"If build sanitizer"
......
# jemalloc
ExternalProject_Add(jemalloc
GIT_REPOSITORY https://github.com/jemalloc/jemalloc.git
GIT_TAG 5.3.0
SOURCE_DIR "${TD_CONTRIB_DIR}/jemalloc"
BINARY_DIR ""
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
GIT_SHALLOW true
GIT_PROGRESS true
)
......@@ -2,7 +2,7 @@
# zlib
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 817cb6a
GIT_TAG 2.1.1
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -27,6 +27,10 @@ else ()
cat("${TD_SUPPORT_DIR}/taosadapter_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
if(TD_LINUX_64 AND JEMALLOC_ENABLED)
cat("${TD_SUPPORT_DIR}/jemalloc_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
# pthread
if(${BUILD_PTHREAD})
cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
......@@ -399,6 +403,18 @@ if(${BUILD_ADDR2LINE})
endif(NOT ${TD_WINDOWS})
endif(${BUILD_ADDR2LINE})
# jemalloc
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
include(ExternalProject)
ExternalProject_Add(jemalloc
PREFIX "jemalloc"
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/
BUILD_COMMAND ${MAKE}
)
INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/build/include)
ENDIF ()
# ================================================================================================
# Build test
......
此差异已折叠。
此差异已折叠。
......@@ -41,10 +41,8 @@ extern int32_t tsCompressMsgSize;
extern int32_t tsCompressColData;
extern int32_t tsMaxNumOfDistinctResults;
extern int32_t tsCompatibleModel;
extern bool tsEnableSlaveQuery;
extern bool tsPrintAuth;
extern int64_t tsTickPerMin[3];
extern int32_t tsCountAlwaysReturnValue;
// multi-process
......@@ -92,8 +90,6 @@ extern uint16_t tsTelemPort;
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
extern bool tsRetrieveBlockingModel; // retrieve threads will be blocked
extern bool tsKeepOriginalColumnName;
extern bool tsDeadLockKillQuery;
// query client
extern int32_t tsQueryPolicy;
......@@ -102,11 +98,6 @@ extern int32_t tsQuerySmaOptimize;
// client
extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime;
extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay;
extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int64_t tsMaxRetentWindow;
// build info
extern char version[];
......
......@@ -55,7 +55,6 @@ enum {
TASK_INPUT_STATUS__NORMAL = 1,
TASK_INPUT_STATUS__BLOCKED,
TASK_INPUT_STATUS__RECOVER,
TASK_INPUT_STATUS__PROCESSING,
TASK_INPUT_STATUS__STOP,
TASK_INPUT_STATUS__FAILED,
};
......@@ -320,17 +319,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
#if 0
while (1) {
int8_t inputStatus =
atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
break;
}
ASSERT(0);
}
#endif
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
if (pSubmitClone == NULL) {
......@@ -443,13 +431,14 @@ typedef struct {
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t sourceTaskId;
int32_t sourceVg;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
} SStreamTaskRecoverReq;
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t rspTaskId;
int32_t reqTaskId;
int8_t inputStatus;
} SStreamTaskRecoverRsp;
......
......@@ -210,7 +210,7 @@ SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
bool syncEnvIsStart();
const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid);
......
......@@ -238,7 +238,7 @@ typedef struct SyncClientRequestBatch {
char data[]; // block2, block3
} SyncClientRequestBatch;
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
int32_t vgId);
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg);
......
......@@ -257,14 +257,13 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)},
{TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)},
{TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)},
{TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)},
{TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)},
// {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)},
// {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)},
{TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)},
{TSDB_INS_TABLE_USER_DATABASES, userDBSchema, tListLen(userDBSchema)},
{TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
{TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)},
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
{TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)},
{TSDB_INS_TABLE_USER_TAGS, userTagsSchema, tListLen(userTagsSchema)},
// {TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
......
......@@ -1713,7 +1713,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
printf("%s |block type %d |child id %d|group id %zX\n", flag, (int32_t)pDataBlock->info.type,
printf("%s |block type %d |child id %d|group id %" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type,
pDataBlock->info.childId, pDataBlock->info.groupId);
for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag);
......
......@@ -35,7 +35,6 @@ int32_t tsNumOfSupportVnodes = 256;
// common
int32_t tsMaxShellConns = 50000;
int32_t tsShellActivityTimer = 3; // second
bool tsEnableSlaveQuery = true;
bool tsPrintAuth = false;
// multi process
......@@ -118,20 +117,6 @@ int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
// 1 database precision unit for interval time range, changed accordingly
int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
int32_t tsMaxStreamComputDelay = 20000;
// 10sec, the first stream computing delay time after system launched successfully, changed accordingly
int32_t tsStreamCompStartDelay = 10000;
// the stream computing delay time after executing failed, change accordingly
int32_t tsRetryStreamCompDelay = 10 * 1000;
// The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f;
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
// 0 no query allowed, queries are disabled
......@@ -142,12 +127,6 @@ int64_t tsQueryBufferSizeBytes = -1;
// in retrieve blocking model, the retrieve threads will wait for the completion of the query processing.
bool tsRetrieveBlockingModel = false;
// last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name
bool tsKeepOriginalColumnName = false;
// kill long query
bool tsDeadLockKillQuery = false;
// tsdb config
// For backward compatibility
bool tsdbForceKeepFile = false;
......@@ -330,11 +309,10 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1;
if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalTempDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalTmpDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
......@@ -383,15 +361,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamCompDelay", tsMaxStreamComputDelay, 10, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxFirstStreamCompDelay", tsStreamCompStartDelay, 1000, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "retryStreamCompDelay", tsRetryStreamCompDelay, 10, 1000000000, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "streamCompDelayRatio", tsStreamComputDelayRatio, 0.1, 0.9, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "retrieveBlockingModel", tsRetrieveBlockingModel, 0) != 0) return -1;
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1;
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
......@@ -399,7 +371,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
......@@ -409,25 +381,21 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1;
tsNumOfMnodeQueryThreads = tsNumOfCores * 2;
tsNumOfMnodeQueryThreads = TRANGE(tsNumOfMnodeQueryThreads, 4, 8);
if (cfgAddInt32(pCfg, "numOfMnodeQueryThreads", tsNumOfMnodeQueryThreads, 1, 1024, 0) != 0) return -1;
tsNumOfMnodeReadThreads = tsNumOfCores / 8;
tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4);
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeQueryThreads = tsNumOfCores * 2;
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeStreamThreads = tsNumOfCores / 4;
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeWriteThreads = tsNumOfCores;
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
......@@ -447,11 +415,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeSharedThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeSharedThreads, 2, 1024, 0) != 0) return -1;
tsNumOfSnodeUniqueThreads = tsNumOfCores / 4;
tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 2, 1024, 0) != 0) return -1;
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, TSDB_MAX_MSG_SIZE * 10000L);
......@@ -532,7 +500,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval;
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
if (taosMulMkDir(tsTempDir) != 0) {
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
return -1;
......@@ -545,7 +513,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32;
tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32;
tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
......@@ -579,15 +546,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
tsQueryBufferSize = cfgGetItem(pCfg, "queryBufferSize")->i32;
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->i32;
tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval;
tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32;
......@@ -598,7 +559,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsNumOfMnodeQueryThreads = cfgGetItem(pCfg, "numOfMnodeQueryThreads")->i32;
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32;
......@@ -673,9 +633,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break;
}
case 'd': {
if (strcasecmp("deadLockKillQuery", name) == 0) {
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->i32;
} else if (strcasecmp("dDebugFlag", name) == 0) {
if (strcasecmp("dDebugFlag", name) == 0) {
dDebugFlag = cfgGetItem(pCfg, "dDebugFlag")->i32;
}
break;
......@@ -732,9 +690,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break;
}
case 'k': {
if (strcasecmp("keepColumnName", name) == 0) {
tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
}
break;
}
case 'l': {
......@@ -758,10 +713,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32;
} else if (strcasecmp("maxNumOfDistinctRes", name) == 0) {
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
} else if (strcasecmp("maxStreamCompDelay", name) == 0) {
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
} else if (strcasecmp("maxFirstStreamCompDelay", name) == 0) {
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
}
break;
}
......@@ -772,8 +723,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break;
}
case 'i': {
if (strcasecmp("minimalTempDirGB", name) == 0) {
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval;
if (strcasecmp("minimalTmpDirGB", name) == 0) {
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
} else if (strcasecmp("minimalDataDirGB", name) == 0) {
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
} else if (strcasecmp("minSlidingTime", name) == 0) {
......@@ -834,8 +785,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
} else if (strcasecmp("numOfCommitThreads", name) == 0) {
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
} else if (strcasecmp("numOfMnodeQueryThreads", name) == 0) {
tsNumOfMnodeQueryThreads = cfgGetItem(pCfg, "numOfMnodeQueryThreads")->i32;
} else if (strcasecmp("numOfMnodeReadThreads", name) == 0) {
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
} else if (strcasecmp("numOfVnodeQueryThreads", name) == 0) {
......@@ -883,9 +832,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break;
}
case 'r': {
if (strcasecmp("retryStreamCompDelay", name) == 0) {
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
} else if (strcasecmp("retrieveBlockingModel", name) == 0) {
if (strcasecmp("retrieveBlockingModel", name) == 0) {
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
} else if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) {
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
......@@ -913,10 +860,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
} else if (strcasecmp("statusInterval", name) == 0) {
tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32;
} else if (strcasecmp("streamCompDelayRatio", name) == 0) {
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
} else if (strcasecmp("slaveQuery", name) == 0) {
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
} else if (strcasecmp("snodeShmSize", name) == 0) {
tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32;
} else if (strcasecmp("serverPort", name) == 0) {
......
......@@ -14,4 +14,7 @@ target_include_directories(
taosd
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/node_mgmt/inc"
)
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
add_dependencies(taosd jemalloc)
ENDIF ()
target_link_libraries(taosd dnode)
......@@ -179,7 +179,7 @@ static int32_t sndProcessTaskRecoverRsp(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
SStreamTaskRecoverRsp *pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
int32_t taskId = pRsp->rspTaskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
streamProcessRecoverRsp(pTask, pRsp);
return 0;
......
......@@ -293,7 +293,10 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
if (ret < 0 || c) {
ASSERT(0);
tdbTbcClose(pUidIdxc);
terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
// ASSERT(0);
return -1;
}
......@@ -980,6 +983,9 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
tbDbKey.version = pME->version;
tbDbKey.uid = pME->uid;
metaDebug("vgId:%d, start to save table version:%" PRId64 "uid: %" PRId64, TD_VID(pMeta->pVnode), pME->version,
pME->uid);
pKey = &tbDbKey;
kLen = sizeof(tbDbKey);
......@@ -1012,6 +1018,9 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
return 0;
_err:
metaError("vgId:%d, failed to save table version:%" PRId64 "uid: %" PRId64 " %s", TD_VID(pMeta->pVnode), pME->version,
pME->uid, tstrerror(terrno));
taosMemoryFree(pVal);
return -1;
}
......
......@@ -599,14 +599,14 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
SSubmitReq *pReq = NULL;
// TODO: the schema update should be handled
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma),
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma),
suid, pItem->level, terrstr());
goto _err;
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData);
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
// SRsmaSnapReader ========================================
struct SRsmaSnapReader {
SSma* pSma;
int64_t sver;
int64_t ever;
// for data file
int8_t rsmaDataDone[TSDB_RETENTION_L2];
STsdbSnapReader* pDataReader[TSDB_RETENTION_L2];
// for qtaskinfo file
int8_t qTaskDone;
SQTaskFReader* pQTaskFReader;
};
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader) {
int32_t code = 0;
SRsmaSnapReader* pReader = NULL;
// alloc
pReader = (SRsmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->pSma = pSma;
pReader->sver = sver;
pReader->ever = ever;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, &pReader->pDataReader[i]);
if (code < 0) {
goto _err;
}
}
}
*ppReader = pReader;
smaInfo("vgId:%d vnode snapshot rsma reader opened succeed", SMA_VID(pSma));
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d vnode snapshot rsma reader opened failed since %s", SMA_VID(pSma), tstrerror(code));
return TSDB_CODE_FAILED;
}
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
SSma* pSma = pReader->pSma;
_exit:
smaInfo("vgId:%d vnode snapshot rsma read qtaskinfo succeed", SMA_VID(pSma));
return code;
_err:
smaError("vgId:%d vnode snapshot rsma read qtaskinfo failed since %s", SMA_VID(pSma), tstrerror(code));
return code;
}
int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
*ppData = NULL;
smaInfo("vgId:%d vnode snapshot rsma read entry", SMA_VID(pReader->pSma));
// read rsma1/rsma2 file
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
STsdbSnapReader* pTsdbSnapReader = pReader->pDataReader[i];
if (!pTsdbSnapReader) {
continue;
}
if (!pReader->rsmaDataDone[i]) {
smaInfo("vgId:%d vnode snapshot rsma read level %d not done", SMA_VID(pReader->pSma), i);
code = tsdbSnapRead(pTsdbSnapReader, ppData);
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->rsmaDataDone[i] = 1;
}
}
} else {
smaInfo("vgId:%d vnode snapshot rsma read level %d is done", SMA_VID(pReader->pSma), i);
}
}
// read qtaskinfo file
if (!pReader->qTaskDone) {
code = rsmaSnapReadQTaskInfo(pReader, ppData);
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->qTaskDone = 1;
}
}
}
_exit:
smaInfo("vgId:%d vnode snapshot rsma read succeed", SMA_VID(pReader->pSma));
return code;
_err:
smaError("vgId:%d vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code));
return code;
}
int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) {
int32_t code = 0;
SRsmaSnapReader* pReader = *ppReader;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pReader->pDataReader[i]) {
tsdbSnapReaderClose(&pReader->pDataReader[i]);
}
}
if (pReader->pQTaskFReader) {
// TODO: close for qtaskinfo
smaInfo("vgId:%d vnode snapshot rsma reader closed for qTaskInfo", SMA_VID(pReader->pSma));
}
smaInfo("vgId:%d vnode snapshot rsma reader closed", SMA_VID(pReader->pSma));
taosMemoryFreeClear(*ppReader);
return code;
}
// SRsmaSnapWriter ========================================
struct SRsmaSnapWriter {
SSma* pSma;
int64_t sver;
int64_t ever;
// config
int64_t commitID;
// for data file
STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2];
// for qtaskinfo file
SQTaskFReader* pQTaskFReader;
SQTaskFWriter* pQTaskFWriter;
};
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter) {
int32_t code = 0;
SRsmaSnapWriter* pWriter = NULL;
// alloc
pWriter = (SRsmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pSma = pSma;
pWriter->sver = sver;
pWriter->ever = ever;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]);
if (code < 0) {
goto _err;
}
}
}
// qtaskinfo
// TODO
*ppWriter = pWriter;
smaInfo("vgId:%d rsma snapshot writer open succeed", TD_VID(pSma->pVnode));
return code;
_err:
smaError("vgId:%d rsma snapshot writer open failed since %s", TD_VID(pSma->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
SRsmaSnapWriter* pWriter = *ppWriter;
if (rollback) {
ASSERT(0);
// code = tsdbFSRollback(pWriter->pTsdb->pFS);
// if (code) goto _err;
} else {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pWriter->pDataWriter[i]) {
code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback);
if (code) goto _err;
}
}
}
taosMemoryFree(pWriter);
*ppWriter = NULL;
smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
return code;
_err:
smaError("vgId:%d vnode snapshot rsma writer close failed since %s", SMA_VID(pWriter->pSma), tstrerror(code));
return code;
}
int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
// rsma1/rsma2
if (pHdr->type == SNAP_DATA_RSMA1) {
pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[0], pData, nData);
} else if (pHdr->type == SNAP_DATA_RSMA2) {
pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData);
} else if (pHdr->type == SNAP_DATA_QTASK) {
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
}
if (code < 0) goto _err;
_exit:
smaInfo("vgId:%d rsma snapshot write for data %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
return code;
_err:
smaError("vgId:%d rsma snapshot write for data %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type,
tstrerror(code));
return code;
}
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
if (pWriter->pQTaskFWriter == NULL) {
// SDelFile* pDelFile = pWriter->fs.pDelFile;
// // reader
// if (pDelFile) {
// code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
// if (code) goto _err;
// code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL);
// if (code) goto _err;
// }
// // writer
// SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0};
// code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
// if (code) goto _err;
}
smaInfo("vgId:%d vnode snapshot rsma write qtaskinfo succeed", SMA_VID(pWriter->pSma));
_exit:
return code;
_err:
smaError("vgId:%d vnode snapshot rsma write qtaskinfo failed since %s", SMA_VID(pWriter->pSma), tstrerror(code));
return code;
}
......@@ -796,7 +796,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
int32_t taskId = pRsp->rspTaskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRecoverRsp(*ppTask, pRsp);
......
......@@ -46,11 +46,6 @@ void tsdbCloseCache(SLRUCache *pCache) {
}
}
/* static void getTableCacheKeyS(tb_uid_t uid, const char *cacheType, char *key, int *len) { */
/* snprintf(key, 30, "%" PRIi64 "%s", uid, cacheType); */
/* *len = strlen(key); */
/* } */
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
if (cacheType == 0) { // last_row
*(uint64_t *)key = (uint64_t)uid;
......@@ -649,44 +644,44 @@ _err:
return code;
}
static int32_t tsRowFromTsdbRow(STSchema *pTSchema, TSDBROW *pRow, STSRow **ppRow) {
int32_t code = 0;
/* static int32_t tsRowFromTsdbRow(STSchema *pTSchema, TSDBROW *pRow, STSRow **ppRow) { */
/* int32_t code = 0; */
SColVal *pColVal = &(SColVal){0};
/* SColVal *pColVal = &(SColVal){0}; */
if (pRow->type == 0) {
*ppRow = tdRowDup(pRow->pTSRow);
} else {
SArray *pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
if (pArray == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
/* if (pRow->type == 0) { */
/* *ppRow = tdRowDup(pRow->pTSRow); */
/* } else { */
/* SArray *pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); */
/* if (pArray == NULL) { */
/* code = TSDB_CODE_OUT_OF_MEMORY; */
/* goto _exit; */
/* } */
TSDBKEY key = TSDBROW_KEY(pRow);
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts});
/* TSDBKEY key = TSDBROW_KEY(pRow); */
/* STColumn *pTColumn = &pTSchema->columns[0]; */
/* *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts}); */
if (taosArrayPush(pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
/* if (taosArrayPush(pArray, pColVal) == NULL) { */
/* code = TSDB_CODE_OUT_OF_MEMORY; */
/* goto _exit; */
/* } */
for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (taosArrayPush(pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
/* for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { */
/* tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); */
/* if (taosArrayPush(pArray, pColVal) == NULL) { */
/* code = TSDB_CODE_OUT_OF_MEMORY; */
/* goto _exit; */
/* } */
/* } */
code = tdSTSRowNew(pArray, pTSchema, ppRow);
if (code) goto _exit;
}
/* code = tdSTSRowNew(pArray, pTSchema, ppRow); */
/* if (code) goto _exit; */
/* } */
_exit:
return code;
}
/* _exit: */
/* return code; */
/* } */
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
bool deleted = false;
......
......@@ -413,7 +413,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
syncClientRequestBatchDestroyDeep(pSyncMsg);
syncClientRequestBatchDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
......
......@@ -318,6 +318,7 @@ typedef struct STableScanInfo {
int32_t currentTable;
int8_t scanMode;
int8_t noTable;
int8_t assignBlockUid;
} STableScanInfo;
typedef struct STableMergeScanInfo {
......
......@@ -264,6 +264,12 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
if (pTaskInfo->tableqinfoList.map == NULL) {
pTaskInfo->tableqinfoList.map =
taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
}
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
}
if (keyBuf != NULL) {
......
......@@ -116,7 +116,8 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
}
static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) {
SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock,
int32_t rightPos) {
SJoinOperatorInfo* pJoinInfo = pOperator->info;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
......@@ -129,7 +130,7 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock*
int32_t rowIndex = -1;
SColumnInfoData* pSrc = NULL;
if (pJoinInfo->pLeft->info.blockId == blockId) {
if (pLeftBlock->info.blockId == blockId) {
pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
rowIndex = leftPos;
} else {
......@@ -144,7 +145,128 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock*
colDataAppend(pDst, currRow, p, false);
}
}
}
typedef struct SRowLocation {
SSDataBlock* pDataBlock;
int32_t pos;
} SRowLocation;
// pBlock[tsSlotId][startPos, endPos) == timestamp,
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp,
int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) {
int32_t numRows = pBlock->info.rows;
ASSERT(startPos < numRows);
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
int32_t i = startPos;
for (; i < numRows; ++i) {
char* pNextVal = colDataGetData(pCol, i);
if (timestamp != *(int64_t*)pNextVal) {
break;
}
}
int32_t endPos = i;
*pEndPos = endPos;
if (endPos - startPos == 0) {
return 0;
}
SSDataBlock* block = pBlock;
bool createdNewBlock = false;
if (endPos == numRows) {
block = blockDataExtractBlock(pBlock, startPos, endPos-startPos);
taosArrayPush(createdBlocks, &block);
createdNewBlock = true;
}
SRowLocation location = {0};
for (int32_t j = startPos; j < endPos; ++j) {
location.pDataBlock = block;
location.pos = ( createdNewBlock ? j - startPos : j);
taosArrayPush(rowLocations, &location);
}
return 0;
}
// whichChild == 0, left child of join; whichChild ==1, right child of join
static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId,
SSDataBlock* startDataBlock, int32_t startPos,
int64_t timestamp, SArray* rowLocations,
SArray* createdBlocks) {
ASSERT(whichChild == 0 || whichChild == 1);
SJoinOperatorInfo* pJoinInfo = pOperator->info;
int32_t endPos = -1;
SSDataBlock* dataBlock = startDataBlock;
mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks);
while (endPos == dataBlock->info.rows) {
SOperatorInfo* ds = pOperator->pDownstream[whichChild];
dataBlock = ds->fpSet.getNextFn(ds);
if (whichChild == 0) {
pJoinInfo->leftPos = 0;
pJoinInfo->pLeft = dataBlock;
} else if (whichChild == 1) {
pJoinInfo->rightPos = 0;
pJoinInfo->pRight = dataBlock;
}
if (dataBlock == NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
endPos = -1;
break;
}
mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks);
}
if (endPos != -1) {
if (whichChild == 0) {
pJoinInfo->leftPos = endPos;
} else if (whichChild == 1) {
pJoinInfo->rightPos = endPos;
}
}
return 0;
}
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
int32_t* nRows) {
SJoinOperatorInfo* pJoinInfo = pOperator->info;
SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
for (int32_t i = 0; i < leftNumJoin; ++i) {
for (int32_t j = 0; j < rightNumJoin; ++j) {
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos);
++*nRows;
}
}
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(rightCreatedBlocks);
taosArrayDestroy(rightRowLocations);
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(leftCreatedBlocks);
taosArrayDestroy(leftRowLocations);
return TSDB_CODE_SUCCESS;
}
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
......@@ -201,12 +323,9 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
}
if (leftTs == rightTs) {
mergeJoinJoinLeftRight(pOperator, pRes, nrows,
pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos);
pJoinInfo->leftPos += 1;
pJoinInfo->rightPos += 1;
nrows += 1;
mergeJoinJoinLeftRight(pOperator, pRes, nrows, pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight,
pJoinInfo->rightPos);
mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
} else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) {
pJoinInfo->leftPos += 1;
......
......@@ -408,6 +408,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pBlock->info.groupId = *groupId;
}
if (pTableScanInfo->assignBlockUid) {
pBlock->info.groupId = pBlock->info.uid;
}
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
......@@ -616,6 +620,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList;
pInfo->currentGroupId = -1;
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pOperator->name = "TableScanOperator"; // for debug purpose
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
......
......@@ -1091,6 +1091,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, scanFlag, true);
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
}
......@@ -2098,9 +2100,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
switch (pSliceInfo->fillType) {
case TSDB_FILL_NULL:
case TSDB_FILL_NULL: {
colDataAppendNULL(pDst, rows);
pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_SET_VALUE: {
SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal;
......@@ -2118,9 +2122,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char*)&v, false);
}
} break;
pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_LINEAR:
case TSDB_FILL_LINEAR: {
#if 0
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
......@@ -2151,17 +2157,22 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
}
}
#endif
// TODO: pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_PREV: {
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
colDataAppend(pDst, rows, pkey->pData, false);
} break;
pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_NEXT: {
char* p = colDataGetData(pSrc, rowIndex);
colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex));
} break;
pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_NONE:
default:
......@@ -2169,7 +2180,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
}
}
pResBlock->info.rows += 1;
}
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
......@@ -2221,6 +2231,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SInterval* pInterval = &pSliceInfo->interval;
SOperatorInfo* downstream = pOperator->pDownstream[0];
blockDataCleanup(pResBlock);
int32_t numOfRows = 0;
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
......
......@@ -1522,6 +1522,7 @@ static const char* jkTableScanPhysiPlanWatermark = "Watermark";
static const char* jkTableScanPhysiPlanIgnoreExpired = "IgnoreExpired";
static const char* jkTableScanPhysiPlanGroupTags = "GroupTags";
static const char* jkTableScanPhysiPlanGroupSort = "GroupSort";
static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
......@@ -1578,6 +1579,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanGroupSort, pNode->groupSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanAssignBlockUid, pNode->assignBlockUid);
}
return code;
}
......@@ -1637,6 +1641,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanGroupSort, &pNode->groupSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanAssignBlockUid, &pNode->assignBlockUid);
}
return code;
}
......@@ -4525,7 +4532,6 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
return jsonToPhysiScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
return jsonToPhysiLastRowScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
......
......@@ -82,6 +82,7 @@ static bool columnNodeEqual(const SColumnNode* a, const SColumnNode* b) {
COMPARE_STRING_FIELD(dbName);
COMPARE_STRING_FIELD(tableName);
COMPARE_STRING_FIELD(colName);
COMPARE_STRING_FIELD(tableAlias);
return true;
}
......
......@@ -2336,7 +2336,7 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
FOREACH(pAggTarget, pAgg->pTargets) {
SNode* pScanTarget = NULL;
FOREACH(pScanTarget, pScanNode->node.pTargets) {
if (0 == strcmp(((SColumnNode*)pAggTarget)->colName, ((SColumnNode*)pAggTarget)->colName)) {
if (0 == strcmp(((SColumnNode*)pAggTarget)->colName, ((SColumnNode*)pScanTarget)->colName)) {
nodesListAppend(pScanTargets, nodesCloneNode(pScanTarget));
break;
}
......
......@@ -168,20 +168,20 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
return TSDB_CODE_SUCCESS;
}
// Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR;
}
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(status));
int8_t jobStatus = 0;
if (schJobNeedToStop(pJob, &jobStatus)) {
SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
SCH_TASK_ELOG("task already not in EXEC status, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
......
......@@ -32,10 +32,10 @@ typedef struct {
static SStreamGlobalEnv streamEnv;
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamExec(SStreamTask* pTask);
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatch(SStreamTask* pTask);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
......
......@@ -189,7 +189,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
#if 0
if (pTask->execType != TASK_EXEC__NONE) {
#endif
streamExec(pTask, pTask->pMsgCb);
streamExec(pTask);
#if 0
} else {
ASSERT(pTask->sinkType != TASK_SINK__NONE);
......@@ -208,7 +208,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
// 3.2 dispatch / sink
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
streamDispatch(pTask, pTask->pMsgCb);
streamDispatch(pTask);
}
return 0;
......@@ -233,26 +233,55 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
return 0;
}
// continue dispatch
streamDispatch(pTask, pTask->pMsgCb);
streamDispatch(pTask);
return 0;
}
int32_t streamProcessRunReq(SStreamTask* pTask) {
streamExec(pTask, pTask->pMsgCb);
streamExec(pTask);
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatch(pTask, pTask->pMsgCb);
streamDispatch(pTask);
}
return 0;
}
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
//
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamTaskRecoverRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->inputStatus = pTask->inputStatus;
pCont->streamId = pTask->streamId;
pCont->reqTaskId = pTask->taskId;
pCont->rspTaskId = pReq->upstreamTaskId;
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp);
tmsgSendRsp(pRsp);
return 0;
}
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) {
//
if (pRsp->inputStatus == TASK_INPUT_STATUS__NORMAL) {
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
streamProcessRunReq(pTask);
if (pTask->isDataScan) {
// scan data to recover
pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
pTask->taskStatus = TASK_STATUS__RECOVERING;
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
if (streamPipelineExec(pTask, 100) < 0) {
return -1;
}
} else {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->taskStatus = TASK_STATUS__NORMAL;
}
}
return 0;
}
......@@ -262,10 +291,10 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->execType != TASK_EXEC__NONE);
streamExec(pTask, pTask->pMsgCb);
streamExec(pTask);
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
streamDispatch(pTask, pTask->pMsgCb);
streamDispatch(pTask);
return 0;
}
......
......@@ -438,7 +438,7 @@ FAIL:
return code;
}
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
#if 1
int8_t old =
......
......@@ -141,7 +141,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
streamDispatch(pTask, pTask->pMsgCb);
streamDispatch(pTask);
}
}
......@@ -229,7 +229,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
}
// TODO: handle version
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
int32_t streamExec(SStreamTask* pTask) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) return -1;
while (1) {
......
......@@ -19,8 +19,8 @@ int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecover
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
......@@ -29,8 +29,8 @@ int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* p
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
......@@ -38,7 +38,8 @@ int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* p
int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->reqTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->rspTaskId) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->inputStatus) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
......@@ -47,7 +48,8 @@ int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecover
int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->reqTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->rspTaskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->inputStatus) < 0) return -1;
tEndDecode(pDecoder);
return 0;
......@@ -125,7 +127,7 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq*
}
if (pTask->taskStatus == TASK_STATUS__RECOVERING) {
if (streamPipelineExec(pTask, 10) < 0) {
if (streamPipelineExec(pTask, 100) < 0) {
// set fail
return -1;
}
......
......@@ -170,7 +170,7 @@ void syncNodeStart(SSyncNode* pSyncNode);
void syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
// option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
......
......@@ -677,7 +677,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
return ret;
}
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) {
int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) {
if (arrSize < 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
......@@ -690,18 +690,18 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_
}
ASSERT(rid == pSyncNode->rid);
int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgPArr, pIsWeakArr, arrSize);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
}
static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) {
static bool syncNodeBatchOK(SRpcMsg** pMsgPArr, int32_t arrSize) {
for (int32_t i = 0; i < arrSize; ++i) {
if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE) {
if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE) {
return false;
}
if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
return false;
}
}
......@@ -709,8 +709,8 @@ static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) {
return true;
}
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) {
if (!syncNodeBatchOK(pMsgArr, arrSize)) {
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) {
if (!syncNodeBatchOK(pMsgPArr, arrSize)) {
syncNodeErrorLog(pSyncNode, "sync propose batch error");
terrno = TSDB_CODE_SYN_BATCH_ERROR;
return -1;
......@@ -736,16 +736,23 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
SRaftMeta raftArr[SYNC_MAX_BATCH_SIZE];
for (int i = 0; i < arrSize; ++i) {
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "propose type:%s,%d, batch:%d", TMSG_INFO(pMsgPArr[i]->msgType),
pMsgPArr[i]->msgType, arrSize);
syncNodeEventLog(pSyncNode, eventLog);
} while (0);
SRespStub stub;
stub.createTime = taosGetTimestampMs();
stub.rpcMsg = pMsgArr[i];
stub.rpcMsg = *(pMsgPArr[i]);
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
raftArr[i].isWeak = pIsWeakArr[i];
raftArr[i].seqNum = seqNum;
}
SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgArr, raftArr, arrSize, pSyncNode->vgId);
SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgPArr, raftArr, arrSize, pSyncNode->vgId);
ASSERT(pSyncMsg != NULL);
SRpcMsg rpcMsg;
......@@ -759,7 +766,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pSyncMsg);
ASSERT(arrSize == pSyncMsg->dataCount);
for (int i = 0; i < arrSize; ++i) {
pMsgArr[i].info.conn.applyIndex = msgArr[i].info.conn.applyIndex;
pMsgPArr[i]->info.conn.applyIndex = msgArr[i].info.conn.applyIndex;
syncRespMgrDel(pSyncNode->pSyncRespMgr, raftArr[i].seqNum);
}
......@@ -790,9 +797,11 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "propose type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
syncNodeEventLog(pSyncNode, eventLog);
} while (0);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
......@@ -860,7 +869,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} else {
ret = -1;
terrno = TSDB_CODE_SYN_NOT_LEADER;
sError("vgId:%d, sync propose not leader, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state));
sError("vgId:%d, sync propose not leader, %s, msgtype:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), TMSG_INFO(pMsg->msgType), pMsg->msgType);
goto _END;
}
......
......@@ -963,9 +963,9 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) {
// block2: SRaftMeta array
// block3: rpc msg array (with pCont)
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
int32_t vgId) {
ASSERT(rpcMsgArr != NULL);
ASSERT(rpcMsgPArr != NULL);
ASSERT(arrSize > 0);
int32_t dataLen = 0;
......@@ -991,7 +991,7 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet
raftMetaArr[i].seqNum = raftArr[i].seqNum;
// init msgArr
msgArr[i] = rpcMsgArr[i];
msgArr[i] = *(rpcMsgPArr[i]);
}
return pMsg;
......
......@@ -22,25 +22,25 @@ done
echo ""
echo "generate vgId ..."
cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | sort | uniq > ${logpath}/log.vgIds.tmp
cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort -T. | uniq | awk -F: '{print $2, $0}' | sort -T. -k1 -n | awk '{print $2}' > ${logpath}/log.vgIds.tmp
echo "all vgIds:" > ${logpath}/log.vgIds
cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort | uniq >> ${logpath}/log.vgIds
cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort -T. | uniq | awk -F: '{print $2, $0}' | sort -T. -k1 -n | awk '{print $2}' >> ${logpath}/log.vgIds
for dnode in `ls ${logpath} | grep dnode | grep -v log`;do
echo "" >> ${logpath}/log.vgIds
echo "" >> ${logpath}/log.vgIds
echo "${dnode}:" >> ${logpath}/log.vgIds
cat ${logpath}/${dnode}/log/taosdlog.* | grep SYN | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort | uniq >> ${logpath}/log.vgIds
cat ${logpath}/${dnode}/log/taosdlog.* | grep SYN | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort -T. | uniq | awk -F: '{print $2, $0}' | sort -T. -k1 -n | awk '{print $2}' >> ${logpath}/log.vgIds
done
echo ""
echo "generate log.dnode.vgId ..."
for logdnode in `ls ${logpath}/log.dnode*`;do
for vgId in `cat ${logpath}/log.vgIds.tmp`;do
rowNum=`cat ${logdnode} | grep "${vgId}" | awk 'BEGIN{rowNum=0}{rowNum++}END{print rowNum}'`
rowNum=`cat ${logdnode} | grep "${vgId}," | awk 'BEGIN{rowNum=0}{rowNum++}END{print rowNum}'`
#echo "-----${rowNum}"
if [ $rowNum -gt 0 ] ; then
echo "generate ${logdnode}.${vgId}"
cat ${logdnode} | grep "${vgId}" > ${logdnode}.${vgId}
cat ${logdnode} | grep "${vgId}," > ${logdnode}.${vgId}
fi
done
done
......@@ -54,7 +54,7 @@ done
echo ""
echo "generate log.leader.term ..."
cat ${logpath}/*.main | grep "become leader" | grep -v "config change" | awk '{print $5,$0}' | awk -F, '{print $4"_"$0}' | sort -k1 > ${logpath}/log.leader.term
cat ${logpath}/*.main | grep "become leader" | grep -v "config change" | awk '{print $5,$0}' | awk -F, '{print $4"_"$0}' | sort -T. -k1 > ${logpath}/log.leader.term
echo ""
echo "generate log.index, log.snapshot, log.records, log.actions ..."
......
......@@ -28,12 +28,12 @@ SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) {
}
SyncClientRequestBatch *createMsg() {
SRpcMsg rpcMsgArr[5];
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
SRpcMsg *rpcMsgPArr[5];
memset(rpcMsgPArr, 0, sizeof(rpcMsgPArr));
for (int32_t i = 0; i < 5; ++i) {
SRpcMsg *pRpcMsg = createRpcMsg(i, 20);
rpcMsgArr[i] = *pRpcMsg;
taosMemoryFree(pRpcMsg);
rpcMsgPArr[i] = pRpcMsg;
//taosMemoryFree(pRpcMsg);
}
SRaftMeta raftArr[5];
......@@ -43,7 +43,7 @@ SyncClientRequestBatch *createMsg() {
raftArr[i].isWeak = i % 2;
}
SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgArr, raftArr, 5, 1234);
SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgPArr, raftArr, 5, 1234);
return pMsg;
}
......
add_executable(transportTest "")
add_executable(transUT "")
add_executable(pushServer "")
add_executable(svrBench "")
add_executable(cliBench "")
target_sources(transUT
PRIVATE
......@@ -12,9 +13,13 @@ target_sources(transportTest
"transportTests.cpp"
)
target_sources(pushServer
target_sources(svrBench
PRIVATE
"pushServer.c"
"svrBench.c"
)
target_sources(cliBench
PRIVATE
"cliBench.c"
)
target_include_directories(transportTest
......@@ -45,13 +50,37 @@ target_include_directories(transUT
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(pushServer
target_include_directories(svrBench
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(svrBench
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (svrBench
os
util
common
gtest_main
transport
)
target_include_directories(cliBench
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(cliBench
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (pushServer
target_link_libraries (cliBench
os
util
common
......
此差异已折叠。
......@@ -24,12 +24,12 @@ int msgSize = 128;
int commit = 0;
TdFilePtr pDataFile = NULL;
STaosQueue *qhandle = NULL;
STaosQset * qset = NULL;
STaosQset *qset = NULL;
void processShellMsg() {
static int num = 0;
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
SRpcMsg *pRpcMsg, rpcMsg;
int type;
SQueueInfo qinfo = {0};
......@@ -77,7 +77,6 @@ void processShellMsg() {
taosFreeQitem(pRpcMsg);
{
// taosSsleep(1);
SRpcMsg nRpcMsg = {0};
nRpcMsg.pCont = rpcMallocCont(msgSize);
nRpcMsg.contLen = msgSize;
......@@ -93,26 +92,6 @@ void processShellMsg() {
taosFreeQall(qall);
}
int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
if (strcmp(meterId, "michael") == 0) {
*spi = 1;
*encrypt = 0;
strcpy(secret, "mypassword");
strcpy(ckey, "key");
} else if (strcmp(meterId, "jeff") == 0) {
*spi = 0;
*encrypt = 0;
} else {
ret = -1; // user not there
}
return ret;
}
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg *pTemp;
......@@ -131,11 +110,12 @@ int main(int argc, char *argv[]) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 7000;
memcpy(rpcInit.localFqdn, "localhost", strlen("localhost"));
rpcInit.label = "SER";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000;
rpcInit.idleTime = 2 * 1500;
rpcDebugFlag = 131;
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
......@@ -170,7 +150,7 @@ int main(int argc, char *argv[]) {
tsAsyncLog = 0;
rpcInit.connType = TAOS_CONN_SERVER;
taosInitLog("server.log", 10);
taosInitLog("server.log", 100000);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
......
......@@ -294,7 +294,7 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) {
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
assert(cnt <= pArray->size);
pArray->size = pArray->size - cnt;
if (pArray->size == 0) {
if (pArray->size == 0 || cnt == 0) {
return;
}
memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size * pArray->elemSize);
......
此差异已折叠。
此差异已折叠。
......@@ -298,8 +298,8 @@
./test.sh -f tsim/sma/drop_sma.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
# temp disable
#./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
#./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
# --- valgrind
./test.sh -f tsim/valgrind/checkError1.sim
......@@ -325,7 +325,7 @@
# --- sync
./test.sh -f tsim/sync/3Replica1VgElect.sim
./test.sh -f tsim/sync/3Replica5VgElect.sim
#./test.sh -f tsim/sync/3Replica5VgElect.sim
./test.sh -f tsim/sync/oneReplica1VgElect.sim
./test.sh -f tsim/sync/oneReplica5VgElect.sim
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册