提交 e9348675 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/mnode

...@@ -37,14 +37,6 @@ enum { ...@@ -37,14 +37,6 @@ enum {
TMQ_MSG_TYPE__EP_RSP, TMQ_MSG_TYPE__EP_RSP,
}; };
enum {
STREAM_TRIGGER__AT_ONCE = 1,
STREAM_TRIGGER__WINDOW_CLOSE,
STREAM_TRIGGER__BY_COUNT,
STREAM_TRIGGER__BY_BATCH_COUNT,
STREAM_TRIGGER__BY_EVENT_TIME,
};
typedef enum EStreamType { typedef enum EStreamType {
STREAM_NORMAL = 1, STREAM_NORMAL = 1,
STREAM_INVERT, STREAM_INVERT,
......
...@@ -96,6 +96,7 @@ extern bool tsDeadLockKillQuery; ...@@ -96,6 +96,7 @@ extern bool tsDeadLockKillQuery;
// query client // query client
extern int32_t tsQueryPolicy; extern int32_t tsQueryPolicy;
extern int32_t tsQuerySmaOptimize;
// client // client
extern int32_t tsMinSlidingTime; extern int32_t tsMinSlidingTime;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_LRUCACHE_H_
#define _TD_UTIL_LRUCACHE_H_
#include "thash.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SLRUCache SLRUCache;
typedef void (*_taos_lru_deleter_t)(const void *key, size_t keyLen, void *value);
typedef struct LRUHandle LRUHandle;
typedef enum {
TAOS_LRU_PRIORITY_HIGH,
TAOS_LRU_PRIORITY_LOW
} LRUPriority;
typedef enum {
TAOS_LRU_STATUS_OK,
TAOS_LRU_STATUS_FAIL,
TAOS_LRU_STATUS_INCOMPLETE,
TAOS_LRU_STATUS_OK_OVERWRITTEN
} LRUStatus;
SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio);
void taosLRUCacheCleanup(SLRUCache *cache);
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority);
LRUHandle *taosLRUCacheLookup(SLRUCache * cache, const void *key, size_t keyLen);
void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
void taosLRUCacheEraseUnrefEntries(SLRUCache *cache);
bool taosLRUCacheRef(SLRUCache *cache, LRUHandle *handle);
bool taosLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef);
void* taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle);
size_t taosLRUCacheGetUsage(SLRUCache *cache);
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache);
void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity);
size_t taosLRUCacheGetCapacity(SLRUCache *cache);
void taosLRUCacheSetStrictCapacity(SLRUCache *cache, bool strict);
bool taosLRUCacheIsStrictCapacity(SLRUCache *cache);
#ifdef __cplusplus
}
#endif
#endif // _TD_UTIL_LRUCACHE_H_
...@@ -167,7 +167,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c ...@@ -167,7 +167,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c
taosThreadMutexInit(&pObj->mutex, NULL); taosThreadMutexInit(&pObj->mutex, NULL);
pObj->id = taosAddRef(clientConnRefPool, pObj); pObj->id = taosAddRef(clientConnRefPool, pObj);
pObj->schemalessType = 0; pObj->schemalessType = 1;
tscDebug("connObj created, 0x%" PRIx64, pObj->id); tscDebug("connObj created, 0x%" PRIx64, pObj->id);
return pObj; return pObj;
......
...@@ -306,19 +306,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -306,19 +306,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
const char* errStr = taos_errstr(res); const char* errStr = taos_errstr(res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr); uError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr);
taosMsleep(100);
} }
taos_free_result(res); taos_free_result(res);
// if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(500);
}
break; break;
} }
case SCHEMA_ACTION_ADD_TAG: { case SCHEMA_ACTION_ADD_TAG: {
...@@ -330,19 +321,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -330,19 +321,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
const char* errStr = taos_errstr(res); const char* errStr = taos_errstr(res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
taosMsleep(100);
} }
taos_free_result(res); taos_free_result(res);
// if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) {
if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(500);
}
break; break;
} }
case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
...@@ -353,19 +335,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -353,19 +335,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
code = taos_errno(res); code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
taosMsleep(100);
} }
taos_free_result(res); taos_free_result(res);
// if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
if (code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(500);
}
break; break;
} }
case SCHEMA_ACTION_CHANGE_TAG_SIZE: { case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
...@@ -376,19 +349,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -376,19 +349,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
code = taos_errno(res); code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
taosMsleep(100);
} }
taos_free_result(res); taos_free_result(res);
// if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
if (code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(500);
}
break; break;
} }
case SCHEMA_ACTION_CREATE_STABLE: { case SCHEMA_ACTION_CREATE_STABLE: {
...@@ -428,18 +392,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -428,18 +392,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
code = taos_errno(res); code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
taosMsleep(100);
} }
taos_free_result(res); taos_free_result(res);
if (code == TSDB_CODE_MND_STB_ALREADY_EXIST) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(500);
}
break; break;
} }
...@@ -473,6 +429,21 @@ static int32_t smlProcessSchemaAction(SSmlHandle* info, SSchema* schemaField, SH ...@@ -473,6 +429,21 @@ static int32_t smlProcessSchemaAction(SSmlHandle* info, SSchema* schemaField, SH
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlCheckMeta(SSchema* schema, int32_t length, SArray* cols){
SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
for(uint16_t i = 0; i < length; i++){
taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
}
for(int32_t i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv* kv = (SSmlKv*)taosArrayGetP(cols, i);
if(taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL){
return -1;
}
}
return 0;
}
static int32_t smlModifyDBSchemas(SSmlHandle* info) { static int32_t smlModifyDBSchemas(SSmlHandle* info) {
int32_t code = 0; int32_t code = 0;
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp); SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
...@@ -483,6 +454,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { ...@@ -483,6 +454,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
while (tableMetaSml) { while (tableMetaSml) {
SSmlSTableMeta* sTableData = *tableMetaSml; SSmlSTableMeta* sTableData = *tableMetaSml;
STableMeta *pTableMeta = NULL; STableMeta *pTableMeta = NULL;
bool needCheckMeta = false; // for multi thread
size_t superTableLen = 0; size_t superTableLen = 0;
void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); void *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
...@@ -533,6 +505,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { ...@@ -533,6 +505,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
needCheckMeta = true;
} else { } else {
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code)); uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
goto end; goto end;
...@@ -544,6 +517,20 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { ...@@ -544,6 +517,20 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable); uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable);
goto end; goto end;
} }
if(needCheckMeta){
code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, sTableData->tags);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" check tag failed. super table name %s", info->id, (char*)superTable);
goto end;
}
code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" check cols failed. super table name %s", info->id, (char*)superTable);
goto end;
}
}
sTableData->tableMeta = pTableMeta; sTableData->tableMeta = pTableMeta;
tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml); tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml);
...@@ -2368,6 +2355,7 @@ static void smlInsertCallback(void* param, void* res, int32_t code) { ...@@ -2368,6 +2355,7 @@ static void smlInsertCallback(void* param, void* res, int32_t code) {
SRequestObj *pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
SSmlHandle* info = (SSmlHandle *)param; SSmlHandle* info = (SSmlHandle *)param;
uDebug("SML:0x%"PRIx64" result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
// lock // lock
if(code != TSDB_CODE_SUCCESS){ if(code != TSDB_CODE_SUCCESS){
taosThreadSpinLock(&info->params->lock); taosThreadSpinLock(&info->params->lock);
...@@ -2496,8 +2484,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr ...@@ -2496,8 +2484,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
end: end:
taosThreadSpinDestroy(&params.lock); taosThreadSpinDestroy(&params.lock);
tsem_destroy(&params.sem); tsem_destroy(&params.sem);
((STscObj *)taos)->schemalessType = 0; // ((STscObj *)taos)->schemalessType = 0;
uDebug("result:%s", request->msgBuf); ((STscObj *)taos)->schemalessType = 1;
uDebug("resultend:%s", request->msgBuf);
return (TAOS_RES*)request; return (TAOS_RES*)request;
} }
...@@ -408,7 +408,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_ ...@@ -408,7 +408,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_
pParam->userParam = userParam; pParam->userParam = userParam;
if (!async) tsem_init(&pParam->rspSem, 0, 0); if (!async) tsem_init(&pParam->rspSem, 0, 0);
sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) goto END; if (sendInfo == NULL) goto END;
sendInfo->msgInfo = (SDataBuf){ sendInfo->msgInfo = (SDataBuf){
.pData = buf, .pData = buf,
...@@ -704,7 +704,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -704,7 +704,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
void* abuf = buf; void* abuf = buf;
tSerializeSCMSubscribeReq(&abuf, &req); tSerializeSCMSubscribeReq(&abuf, &req);
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) goto FAIL; if (sendInfo == NULL) goto FAIL;
SMqSubscribeCbParam param = { SMqSubscribeCbParam param = {
...@@ -1008,7 +1008,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1008,7 +1008,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
pParam->async = async; pParam->async = async;
tsem_init(&pParam->rspSem, 0, 0); tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) { if (sendInfo == NULL) {
tsem_destroy(&pParam->rspSem); tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam); taosMemoryFree(pParam);
...@@ -1162,7 +1162,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1162,7 +1162,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
pParam->vgId = pVg->vgId; pParam->vgId = pVg->vgId;
pParam->epoch = tmq->epoch; pParam->epoch = tmq->epoch;
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) { if (sendInfo == NULL) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
taosMemoryFree(pParam); taosMemoryFree(pParam);
......
...@@ -41,7 +41,7 @@ TARGET_INCLUDE_DIRECTORIES( ...@@ -41,7 +41,7 @@ TARGET_INCLUDE_DIRECTORIES(
PRIVATE "${TD_SOURCE_DIR}/source/client/inc" PRIVATE "${TD_SOURCE_DIR}/source/client/inc"
) )
#add_test( add_test(
# NAME smlTest NAME smlTest
# COMMAND smlTest COMMAND smlTest
#) )
...@@ -499,6 +499,7 @@ TEST(testCase, smlGetTimestampLen_Test) { ...@@ -499,6 +499,7 @@ TEST(testCase, smlGetTimestampLen_Test) {
ASSERT_EQ(len, 3); ASSERT_EQ(len, 3);
} }
/*
TEST(testCase, smlProcess_influx_Test) { TEST(testCase, smlProcess_influx_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr); ASSERT_NE(taos, nullptr);
...@@ -1259,4 +1260,4 @@ TEST(testCase, sml_16368_Test) { ...@@ -1259,4 +1260,4 @@ TEST(testCase, sml_16368_Test) {
pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MICRO_SECONDS); pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MICRO_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0); ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes); taos_free_result(pRes);
} }*/
...@@ -91,7 +91,7 @@ static const SSysDbTableSchema userDBSchema[] = { ...@@ -91,7 +91,7 @@ static const SSysDbTableSchema userDBSchema[] = {
{.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, // {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.name = "retension", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "retension", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
// {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update // {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update
......
...@@ -86,6 +86,7 @@ bool tsSmlDataFormat = ...@@ -86,6 +86,7 @@ bool tsSmlDataFormat =
// query // query
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
int32_t tsQuerySmaOptimize = 1;
/* /*
* denote if the server needs to compress response message at the application layer to client, including query rsp, * denote if the server needs to compress response message at the application layer to client, including query rsp,
...@@ -113,7 +114,7 @@ int32_t tsCompatibleModel = 1; ...@@ -113,7 +114,7 @@ int32_t tsCompatibleModel = 1;
int32_t tsCountAlwaysReturnValue = 1; int32_t tsCountAlwaysReturnValue = 1;
// 10 ms for sliding time, the value will changed in case of time precision changed // 10 ms for sliding time, the value will changed in case of time precision changed
int32_t tsMinSlidingTime = 10; int32_t tsMinSlidingTime = 10;
// the maxinum number of distict query result // the maxinum number of distict query result
int32_t tsMaxNumOfDistinctResults = 1000 * 10000; int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
...@@ -331,6 +332,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { ...@@ -331,6 +332,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1; if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
...@@ -541,6 +543,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -541,6 +543,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32; tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32; tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
return 0; return 0;
} }
......
...@@ -76,22 +76,22 @@ void deltaToUtcInitOnce() { ...@@ -76,22 +76,22 @@ void deltaToUtcInitOnce() {
static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int64_t parseFraction(char* str, char** end, int32_t timePrec);
static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim); static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim);
static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec); static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec);
static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec); static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec);
static char* forwardToTimeStringEnd(char* str); static char* forwardToTimeStringEnd(char* str);
static bool checkTzPresent(const char* str, int32_t len); static bool checkTzPresent(const char* str, int32_t len);
static int32_t (*parseLocaltimeFp[])(char* timestr, int64_t* time, int32_t timePrec) = {parseLocaltime, static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec) = {parseLocaltime,
parseLocaltimeDst}; parseLocaltimeDst};
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) { int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) {
/* parse datatime string in with tz */ /* parse datatime string in with tz */
if (strnchr(timestr, 'T', len, false) != NULL) { if (strnchr(timestr, 'T', len, false) != NULL) {
return parseTimeWithTz(timestr, time, timePrec, 'T'); return parseTimeWithTz(timestr, utime, timePrec, 'T');
} else if (checkTzPresent(timestr, len)) { } else if (checkTzPresent(timestr, len)) {
return parseTimeWithTz(timestr, time, timePrec, 0); return parseTimeWithTz(timestr, utime, timePrec, 0);
} else { } else {
return (*parseLocaltimeFp[day_light])((char*)timestr, time, timePrec); return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec);
} }
} }
...@@ -309,12 +309,36 @@ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, ch ...@@ -309,12 +309,36 @@ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, ch
return 0; return 0;
} }
int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { static FORCE_INLINE bool validateTm(struct tm* pTm) {
if (pTm == NULL) {
return false;
}
int32_t dayOfMonth[12] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};
int32_t leapYearMonthDay = 29;
int32_t year = pTm->tm_year + 1900;
bool isLeapYear = ((year % 100) == 0)? ((year % 400) == 0):((year % 4) == 0);
if (isLeapYear && (pTm->tm_mon == 1)) {
if (pTm->tm_mday > leapYearMonthDay) {
return false;
}
} else {
if (pTm->tm_mday > dayOfMonth[pTm->tm_mon]) {
return false;
}
}
return true;
}
int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec) {
*time = 0; *time = 0;
struct tm tm = {0}; struct tm tm = {0};
char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
if (str == NULL) { if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
return -1; return -1;
} }
...@@ -343,13 +367,13 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { ...@@ -343,13 +367,13 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
return 0; return 0;
} }
int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec) {
*time = 0; *time = 0;
struct tm tm = {0}; struct tm tm = {0};
tm.tm_isdst = -1; tm.tm_isdst = -1;
char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
if (str == NULL) { if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
return -1; return -1;
} }
......
...@@ -1503,8 +1503,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in ...@@ -1503,8 +1503,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)statusB, false); colDataAppend(pColInfo, rows, (const char *)statusB, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); // colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false);
char *p = buildRetension(pDb->cfg.pRetensions); char *p = buildRetension(pDb->cfg.pRetensions);
......
...@@ -351,9 +351,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -351,9 +351,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
ASSERT(totLevel <= 2); ASSERT(totLevel <= 2);
pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
bool hasExtraSink = false; bool hasExtraSink = false;
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
if (totLevel == 2 || externalTargetDB) { SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDbObj != NULL);
sdbRelease(pSdb, pDbObj);
bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
if (totLevel == 2 || externalTargetDB || multiTarget) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink // add extra sink
......
...@@ -136,6 +136,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -136,6 +136,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pReq->subKey); pReq->subKey);
return -1; return -1;
} }
if (pHandle->consumerId != consumerId) { if (pHandle->consumerId != consumerId) {
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld", tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId); consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId);
......
...@@ -142,10 +142,8 @@ typedef struct SElapsedInfo { ...@@ -142,10 +142,8 @@ typedef struct SElapsedInfo {
typedef struct SHistoFuncBin { typedef struct SHistoFuncBin {
double lower; double lower;
double upper; double upper;
union { int64_t count;
int64_t count; double percentage;
double percentage;
};
} SHistoFuncBin; } SHistoFuncBin;
typedef struct SHistoFuncInfo { typedef struct SHistoFuncInfo {
...@@ -2844,6 +2842,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS ...@@ -2844,6 +2842,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) { if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
offset += pCol->info.bytes;
continue; continue;
} }
...@@ -3105,7 +3104,7 @@ int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { ...@@ -3105,7 +3104,7 @@ int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SSpreadInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); SSpreadInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
spreadTransferInfo(pDBuf, pSBuf); spreadTransferInfo(pSBuf, pDBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3276,7 +3275,7 @@ int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { ...@@ -3276,7 +3275,7 @@ int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SElapsedInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); SElapsedInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
elapsedTransferInfo(pDBuf, pSBuf); elapsedTransferInfo(pSBuf, pDBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3583,7 +3582,7 @@ int32_t histogramCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { ...@@ -3583,7 +3582,7 @@ int32_t histogramCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SHistoFuncInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); SHistoFuncInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
histogramTransferInfo(pDBuf, pSBuf); histogramTransferInfo(pSBuf, pDBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3779,7 +3778,7 @@ int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { ...@@ -3779,7 +3778,7 @@ int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SHLLInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); SHLLInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
hllTransferInfo(pDBuf, pSBuf); hllTransferInfo(pSBuf, pDBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4187,6 +4186,8 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -4187,6 +4186,8 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t alreadySampled = pInfo->numSampled;
int32_t startOffset = pCtx->offset; int32_t startOffset = pCtx->offset;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_s(pInputCol, i)) { if (colDataIsNull_s(pInputCol, i)) {
......
...@@ -24,6 +24,9 @@ extern "C" { ...@@ -24,6 +24,9 @@ extern "C" {
#include "parUtil.h" #include "parUtil.h"
#include "parser.h" #include "parser.h"
#define QUERY_SMA_OPTIMIZE_DISABLE 0
#define QUERY_SMA_OPTIMIZE_ENABLE 1
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery); int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery);
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery); int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery); int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
......
...@@ -801,7 +801,8 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti ...@@ -801,7 +801,8 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti
((SDatabaseOptions*)pOptions)->pRetentions = pVal; ((SDatabaseOptions*)pOptions)->pRetentions = pVal;
break; break;
case DB_OPTION_SCHEMALESS: case DB_OPTION_SCHEMALESS:
((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); // ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->schemaless = 1;
break; break;
default: default:
break; break;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "parInt.h" #include "parInt.h"
#include "parToken.h" #include "parToken.h"
#include "systable.h" #include "systable.h"
#include "tglobal.h"
typedef void* (*FMalloc)(size_t); typedef void* (*FMalloc)(size_t);
typedef void (*FFree)(void*); typedef void (*FFree)(void*);
...@@ -116,7 +117,7 @@ static EDealRes collectMetaKeyFromFunction(SCollectMetaKeyFromExprCxt* pCxt, SFu ...@@ -116,7 +117,7 @@ static EDealRes collectMetaKeyFromFunction(SCollectMetaKeyFromExprCxt* pCxt, SFu
} }
static bool needGetTableIndex(SNode* pStmt) { static bool needGetTableIndex(SNode* pStmt) {
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { if (QUERY_SMA_OPTIMIZE_ENABLE == tsQuerySmaOptimize && QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt; SSelectStmt* pSelect = (SSelectStmt*)pStmt;
return (NULL != pSelect->pWindow && QUERY_NODE_INTERVAL_WINDOW == nodeType(pSelect->pWindow)); return (NULL != pSelect->pWindow && QUERY_NODE_INTERVAL_WINDOW == nodeType(pSelect->pWindow));
} }
......
...@@ -1297,11 +1297,12 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { ...@@ -1297,11 +1297,12 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
} }
static int32_t checkSchemalessDb(SInsertParseContext* pCxt, char* pDbName) { static int32_t checkSchemalessDb(SInsertParseContext* pCxt, char* pDbName) {
SDbCfgInfo pInfo = {0}; // SDbCfgInfo pInfo = {0};
char fullName[TSDB_TABLE_FNAME_LEN]; // char fullName[TSDB_TABLE_FNAME_LEN];
snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName); // snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName);
CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo)); // CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo));
return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; // return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
} }
// tb_name // tb_name
...@@ -2119,9 +2120,11 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS ...@@ -2119,9 +2120,11 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
isOrdered = false; isOrdered = false;
} }
if (index < 0) { if (index < 0) {
uError("smlBoundColumnData. index:%d", index);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
if (pColList->cols[index].valStat == VAL_STAT_HAS) { if (pColList->cols[index].valStat == VAL_STAT_HAS) {
uError("smlBoundColumnData. already set. index:%d", index);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
lastColIdx = index; lastColIdx = index;
......
...@@ -1411,7 +1411,7 @@ static bool isSingleTable(SRealTableNode* pRealTable) { ...@@ -1411,7 +1411,7 @@ static bool isSingleTable(SRealTableNode* pRealTable) {
} }
static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) { static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
if (pCxt->createStream) { if (pCxt->createStream || QUERY_SMA_OPTIMIZE_DISABLE == tsQuerySmaOptimize) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (NULL != pCxt->pCurrSelectStmt && NULL != pCxt->pCurrSelectStmt->pWindow && if (NULL != pCxt->pCurrSelectStmt && NULL != pCxt->pCurrSelectStmt->pWindow &&
...@@ -2762,15 +2762,16 @@ static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt ...@@ -2762,15 +2762,16 @@ static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt
} }
static int32_t checkSchemalessDb(STranslateContext* pCxt, const char* pDbName) { static int32_t checkSchemalessDb(STranslateContext* pCxt, const char* pDbName) {
if (0 != pCxt->pParseCxt->schemalessType) { // if (0 != pCxt->pParseCxt->schemalessType) {
return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
} // }
SDbCfgInfo info = {0}; // SDbCfgInfo info = {0};
int32_t code = getDBCfg(pCxt, pDbName, &info); // int32_t code = getDBCfg(pCxt, pDbName, &info);
if (TSDB_CODE_SUCCESS == code) { // if (TSDB_CODE_SUCCESS == code) {
code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; // code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS;
} // }
return code; // return code;
return TSDB_CODE_SUCCESS;
} }
static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) { static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
...@@ -5048,6 +5049,9 @@ static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* p ...@@ -5048,6 +5049,9 @@ static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* p
static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) { static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot; SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot;
int32_t code = checkSchemalessDb(pCxt, pStmt->dbName); int32_t code = checkSchemalessDb(pCxt, pStmt->dbName);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
......
...@@ -158,7 +158,9 @@ class MockCatalogServiceImpl { ...@@ -158,7 +158,9 @@ class MockCatalogServiceImpl {
} }
*pIndexes = taosArrayInit(it->second.size(), sizeof(STableIndexInfo)); *pIndexes = taosArrayInit(it->second.size(), sizeof(STableIndexInfo));
for (const auto& index : it->second) { for (const auto& index : it->second) {
taosArrayPush(*pIndexes, &index); STableIndexInfo info;
taosArrayPush(*pIndexes, copyTableIndexInfo(&info, &index));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -316,6 +318,12 @@ class MockCatalogServiceImpl { ...@@ -316,6 +318,12 @@ class MockCatalogServiceImpl {
pEpSet->inUse = 0; pEpSet->inUse = 0;
} }
STableIndexInfo* copyTableIndexInfo(STableIndexInfo* pDst, const STableIndexInfo* pSrc) const {
memcpy(pDst, pSrc, sizeof(STableIndexInfo));
pDst->expr = strdup(pSrc->expr);
return pDst;
}
std::string toDbname(const std::string& dbFullName) const { std::string toDbname(const std::string& dbFullName) const {
std::string::size_type n = dbFullName.find("."); std::string::size_type n = dbFullName.find(".");
if (n == std::string::npos) { if (n == std::string::npos) {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "functionMgt.h" #include "functionMgt.h"
#include "index.h" #include "index.h"
#include "planInt.h" #include "planInt.h"
#include "ttime.h"
#define OPTIMIZE_FLAG_MASK(n) (1 << n) #define OPTIMIZE_FLAG_MASK(n) (1 << n)
...@@ -816,7 +817,8 @@ static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pInde ...@@ -816,7 +817,8 @@ static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pInde
pSmaScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD; pSmaScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
pSmaScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo)); pSmaScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
if (NULL == pSmaScan->pVgroupList) { pSmaScan->node.pTargets = nodesCloneList(pCols);
if (NULL == pSmaScan->pVgroupList || NULL == pSmaScan->node.pTargets) {
nodesDestroyNode(pSmaScan); nodesDestroyNode(pSmaScan);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -828,19 +830,26 @@ static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pInde ...@@ -828,19 +830,26 @@ static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pInde
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool smaOptEqualInterval(SWindowLogicNode* pWindow, STableIndexInfo* pIndex) { static bool smaOptEqualInterval(SScanLogicNode* pScan, SWindowLogicNode* pWindow, STableIndexInfo* pIndex) {
if (pWindow->interval != pIndex->interval || pWindow->intervalUnit != pIndex->intervalUnit || if (pWindow->interval != pIndex->interval || pWindow->intervalUnit != pIndex->intervalUnit ||
pWindow->offset != pIndex->offset || pWindow->sliding != pIndex->sliding || pWindow->offset != pIndex->offset || pWindow->sliding != pIndex->sliding ||
pWindow->slidingUnit != pIndex->slidingUnit) { pWindow->slidingUnit != pIndex->slidingUnit) {
return false; return false;
} }
// todo time range if (IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
SInterval interval = {.interval = pIndex->interval,
.intervalUnit = pIndex->intervalUnit,
.offset = pIndex->offset,
.offsetUnit = TIME_UNIT_MILLISECOND,
.sliding = pIndex->sliding,
.slidingUnit = pIndex->slidingUnit,
.precision = pScan->node.precision};
return (pScan->scanRange.skey == taosTimeTruncate(pScan->scanRange.skey, &interval, pScan->node.precision)) &&
(pScan->scanRange.ekey + 1 == taosTimeTruncate(pScan->scanRange.ekey + 1, &interval, pScan->node.precision));
}
return true; return true;
} }
// #define SMA_TABLE_NAME "#sma_table"
// #define SMA_COL_NAME_PREFIX "#sma_col_"
static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) { static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) {
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { if (NULL == pCol) {
...@@ -850,9 +859,7 @@ static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) ...@@ -850,9 +859,7 @@ static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId)
pCol->tableType = TSDB_SUPER_TABLE; pCol->tableType = TSDB_SUPER_TABLE;
pCol->colId = colId; pCol->colId = colId;
pCol->colType = COLUMN_TYPE_COLUMN; pCol->colType = COLUMN_TYPE_COLUMN;
snprintf(pCol->colName, sizeof(pCol->colName), "#sma_col_%d", pCol->colId); strcpy(pCol->colName, ((SExprNode*)pFunc)->aliasName);
// strcpy(pCol->tableName, SMA_TABLE_NAME);
// strcpy(pCol->tableAlias, SMA_TABLE_NAME);
pCol->node.resType = ((SExprNode*)pFunc)->resType; pCol->node.resType = ((SExprNode*)pFunc)->resType;
strcpy(pCol->node.aliasName, ((SExprNode*)pFunc)->aliasName); strcpy(pCol->node.aliasName, ((SExprNode*)pFunc)->aliasName);
return (SNode*)pCol; return (SNode*)pCol;
...@@ -876,12 +883,13 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis ...@@ -876,12 +883,13 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis
SNode* pFunc = NULL; SNode* pFunc = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t index = 0; int32_t index = 0;
int32_t smaFuncIndex = -1;
*pWStrartIndex = -1; *pWStrartIndex = -1;
FOREACH(pFunc, pFuncs) { FOREACH(pFunc, pFuncs) {
if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) { if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) {
*pWStrartIndex = index; *pWStrartIndex = index;
} }
int32_t smaFuncIndex = smaOptFindSmaFunc(pFunc, pSmaFuncs); smaFuncIndex = smaOptFindSmaFunc(pFunc, pSmaFuncs);
if (smaFuncIndex < 0) { if (smaFuncIndex < 0) {
break; break;
} else { } else {
...@@ -893,7 +901,7 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis ...@@ -893,7 +901,7 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis
++index; ++index;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code && smaFuncIndex >= 0) {
*pOutput = pCols; *pOutput = pCols;
} else { } else {
nodesDestroyList(pCols); nodesDestroyList(pCols);
...@@ -902,9 +910,10 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis ...@@ -902,9 +910,10 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis
return code; return code;
} }
static int32_t smaOptCouldApplyIndex(SWindowLogicNode* pWindow, STableIndexInfo* pIndex, SNodeList** pCols, static int32_t smaOptCouldApplyIndex(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList** pCols,
int32_t* pWStrartIndex) { int32_t* pWStrartIndex) {
if (!smaOptEqualInterval(pWindow, pIndex)) { SWindowLogicNode* pWindow = (SWindowLogicNode*)pScan->node.pParent;
if (!smaOptEqualInterval(pScan, pWindow, pIndex)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SNodeList* pSmaFuncs = NULL; SNodeList* pSmaFuncs = NULL;
...@@ -961,8 +970,8 @@ static int32_t smaOptRewriteInterval(SWindowLogicNode* pInterval, int32_t wstrar ...@@ -961,8 +970,8 @@ static int32_t smaOptRewriteInterval(SWindowLogicNode* pInterval, int32_t wstrar
return smaOptCreateMergeKey(nodesListGetNode(pInterval->node.pTargets, wstrartIndex), pMergeKeys); return smaOptCreateMergeKey(nodesListGetNode(pInterval->node.pTargets, wstrartIndex), pMergeKeys);
} }
static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex, static int32_t smaOptApplyIndexExt(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex,
SNodeList* pSmaCols, int32_t wstrartIndex) { SNodeList* pSmaCols, int32_t wstrartIndex) {
SWindowLogicNode* pInterval = (SWindowLogicNode*)pScan->node.pParent; SWindowLogicNode* pInterval = (SWindowLogicNode*)pScan->node.pParent;
SNodeList* pMergeTargets = nodesCloneList(pInterval->node.pTargets); SNodeList* pMergeTargets = nodesCloneList(pInterval->node.pTargets);
if (NULL == pMergeTargets) { if (NULL == pMergeTargets) {
...@@ -984,6 +993,16 @@ static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pS ...@@ -984,6 +993,16 @@ static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pS
return code; return code;
} }
static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex,
SNodeList* pSmaCols, int32_t wstrartIndex) {
SLogicNode* pSmaScan = NULL;
int32_t code = smaOptCreateSmaScan(pScan, pIndex, pSmaCols, &pSmaScan);
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pLogicSubplan, pScan->node.pParent, pSmaScan);
}
return code;
}
static void smaOptDestroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); } static void smaOptDestroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); }
static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) { static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) {
...@@ -993,7 +1012,7 @@ static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp ...@@ -993,7 +1012,7 @@ static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i); STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i);
SNodeList* pSmaCols = NULL; SNodeList* pSmaCols = NULL;
int32_t wstrartIndex = -1; int32_t wstrartIndex = -1;
code = smaOptCouldApplyIndex((SWindowLogicNode*)pScan->node.pParent, pIndex, &pSmaCols, &wstrartIndex); code = smaOptCouldApplyIndex(pScan, pIndex, &pSmaCols, &wstrartIndex);
if (TSDB_CODE_SUCCESS == code && NULL != pSmaCols) { if (TSDB_CODE_SUCCESS == code && NULL != pSmaCols) {
code = smaOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols, wstrartIndex); code = smaOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols, wstrartIndex);
taosArrayDestroyEx(pScan->pSmaIndexes, smaOptDestroySmaIndex); taosArrayDestroyEx(pScan->pSmaIndexes, smaOptDestroySmaIndex);
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "planTestUtil.h" #include "planTestUtil.h"
#include "planner.h" #include "planner.h"
#include "tglobal.h"
using namespace std; using namespace std;
...@@ -45,6 +46,14 @@ TEST_F(PlanOtherTest, createSmaIndex) { ...@@ -45,6 +46,14 @@ TEST_F(PlanOtherTest, createSmaIndex) {
run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)"); run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)");
run("SELECT SUM(c4) FROM t1 INTERVAL(10s)"); run("SELECT SUM(c4) FROM t1 INTERVAL(10s)");
run("SELECT _WSTARTTS, MIN(c3 + 10) FROM t1 "
"WHERE ts BETWEEN TIMESTAMP '2022-04-01 00:00:00' AND TIMESTAMP '2022-04-30 23:59:59.999' INTERVAL(10s)");
run("SELECT SUM(c4), MAX(c3) FROM t1 INTERVAL(10s)");
tsQuerySmaOptimize = 0;
run("SELECT SUM(c4) FROM t1 INTERVAL(10s)");
} }
TEST_F(PlanOtherTest, explain) { TEST_F(PlanOtherTest, explain) {
......
...@@ -28,6 +28,12 @@ TEST_F(PlanSubqeuryTest, basic) { ...@@ -28,6 +28,12 @@ TEST_F(PlanSubqeuryTest, basic) {
run("SELECT LAST(c1) FROM (SELECT * FROM t1)"); run("SELECT LAST(c1) FROM (SELECT * FROM t1)");
run("SELECT c1 FROM (SELECT TODAY() AS c1 FROM t1)"); run("SELECT c1 FROM (SELECT TODAY() AS c1 FROM t1)");
run("SELECT NOW() FROM t1");
run("SELECT NOW() FROM (SELECT * FROM t1)");
// run("SELECT NOW() FROM (SELECT * FROM t1) ORDER BY ts");
} }
TEST_F(PlanSubqeuryTest, doubleGroupBy) { TEST_F(PlanSubqeuryTest, doubleGroupBy) {
......
...@@ -110,9 +110,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp ...@@ -110,9 +110,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
return 0; return 0;
} }
// continue dispatch // continue dispatch
if (pTask->dispatchType != TASK_DISPATCH__NONE) { streamDispatch(pTask, pMsgCb);
streamDispatch(pTask, pMsgCb);
}
return 0; return 0;
} }
......
...@@ -182,6 +182,7 @@ FAIL: ...@@ -182,6 +182,7 @@ FAIL:
} }
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
#if 1 #if 1
int8_t old = int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
......
...@@ -713,7 +713,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -713,7 +713,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries // delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0); ASSERT(code == 0);
sInfo("sync event vgId:%d log truncate, from %ld to %ld", ths->vgId, delBegin, delEnd); sDebug("vgId:%d sync event log truncate, from %ld to %ld", ths->vgId, delBegin, delEnd);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code; return code;
...@@ -994,7 +994,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -994,7 +994,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex; ths->commitIndex = snapshot.lastApplyIndex;
sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin, sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin,
commitEnd, syncUtilState2String(ths->state)); commitEnd, syncUtilState2String(ths->state));
} }
......
...@@ -189,16 +189,16 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -189,16 +189,16 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (gRaftDetailLog) { if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender); char* s = snapshotSender2Str(pSender);
sInfo( sDebug(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld" "lastConfigIndex:%ld"
"sender:%s", "sender:%s",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, s); pSender->snapshot.lastConfigIndex, s);
taosMemoryFree(s); taosMemoryFree(s);
} else { } else {
sInfo( sDebug(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " "vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld", "lastApplyTerm:%lu lastConfigIndex:%ld",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex); pSender->snapshot.lastConfigIndex);
......
...@@ -56,7 +56,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -56,7 +56,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex;
sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId,
pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state)); pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state));
} }
......
...@@ -470,7 +470,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -470,7 +470,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return TAOS_SYNC_PROPOSE_OTHER_ERROR; return TAOS_SYNC_PROPOSE_OTHER_ERROR;
} }
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
sTrace("sync event vgId:%d propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SRespStub stub; SRespStub stub;
...@@ -501,7 +501,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -501,7 +501,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
sInfo("sync event vgId:%d sync open", pSyncInfo->vgId); sDebug("vgId:%d sync event sync open", pSyncInfo->vgId);
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
...@@ -761,7 +761,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ...@@ -761,7 +761,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
} }
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
sInfo("sync event vgId:%d sync close", pSyncNode->vgId); sDebug("vgId:%d sync event sync close", pSyncNode->vgId);
int32_t ret; int32_t ret;
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
...@@ -1240,7 +1240,8 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { ...@@ -1240,7 +1240,8 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
} }
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sInfo("sync event vgId:%d become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, debugStr); sDebug("vgId:%d sync event become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy,
debugStr);
// maybe clear leader cache // maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
...@@ -1274,7 +1275,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1274,7 +1275,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>> // /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
// //
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
sInfo("sync event vgId:%d become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, debugStr); sDebug("vgId:%d sync event become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy,
debugStr);
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER; pSyncNode->state = TAOS_SYNC_STATE_LEADER;
...@@ -1882,7 +1884,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -1882,7 +1884,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0; int32_t code = 0;
ESyncState state = flag; ESyncState state = flag;
sInfo("sync event vgId:%d commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex, sDebug("vgId:%d sync event commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex,
endIndex, syncUtilState2String(state)); endIndex, syncUtilState2String(state));
// execute fsm // execute fsm
...@@ -1931,7 +1933,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -1931,7 +1933,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths->pFsm->FpRestoreFinishCb(ths->pFsm); ths->pFsm->FpRestoreFinishCb(ths->pFsm);
} }
ths->restoreFinish = true; ths->restoreFinish = true;
sInfo("sync event vgId:%d restore finish", ths->vgId); sDebug("vgId:%d sync event restore finish", ths->vgId);
} }
} }
......
...@@ -162,7 +162,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -162,7 +162,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true); walFsync(pWal, true);
sTrace("sync event vgId:%d write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId, sDebug("vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId,
pEntry->index, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, pEntry->index, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType));
...@@ -320,7 +320,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -320,7 +320,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
walFsync(pWal, true); walFsync(pWal, true);
sTrace("sync event old write wal: %ld", pEntry->index); sDebug("sync event old write wal: %ld", pEntry->index);
return code; return code;
} }
......
...@@ -140,16 +140,16 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -140,16 +140,16 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace( sDebug(
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld send " "lastConfigIndex:%ld send "
"msg:%s", "msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sTrace( sDebug(
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld", "lastConfigIndex:%ld",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
...@@ -278,23 +278,23 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -278,23 +278,23 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace( sDebug(
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld send " "lastConfigIndex:%ld send "
"msg:%s", "msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sTrace( sDebug(
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld", "lastConfigIndex:%ld",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
} }
} else { } else {
sTrace( sDebug(
"sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld", "lastConfigIndex:%ld",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
...@@ -328,11 +328,11 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -328,11 +328,11 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId, sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId,
host, port, pSender->seq, pSender->ack, msgStr); host, port, pSender->seq, pSender->ack, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d", pSender->pSyncNode->vgId, host, port, sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d", pSender->pSyncNode->vgId, host, port,
pSender->seq, pSender->ack); pSender->seq, pSender->ack);
} }
...@@ -565,12 +565,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -565,12 +565,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", sDebug(
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex,
msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu", sDebug(
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
} }
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
...@@ -597,12 +602,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -597,12 +602,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
bool isDrop; bool isDrop;
if (IamInNew) { if (IamInNew) {
sTrace("sync event vgId:%d update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ", sDebug("vgId:%d sync event update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ",
pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
} else { } else {
sTrace( sDebug(
"sync event vgId:%d do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld ", "lastConfigIndex:%ld ",
pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
} }
...@@ -626,19 +631,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -626,19 +631,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sInfo( sDebug(
"sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, " "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, raft log:%s", "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, raft log:%s",
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
logSimpleStr); snapshot.lastConfigIndex, logSimpleStr);
taosMemoryFree(logSimpleStr); taosMemoryFree(logSimpleStr);
} else { } else {
sInfo( sDebug(
"sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, " "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu", "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld",
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm); pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
snapshot.lastConfigIndex);
} }
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
...@@ -648,12 +654,18 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -648,12 +654,18 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", sDebug(
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, recv msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu", sDebug(
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex);
} }
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
...@@ -667,14 +679,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -667,14 +679,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace( sDebug(
"sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv " "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, recv "
"msg:%s", "msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sTrace("sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu", sDebug(
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex);
} }
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
...@@ -693,13 +710,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -693,13 +710,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace( sDebug(
"sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, "
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); "lastConfigIndex:%ld, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex,
msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sTrace("sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu", sDebug(
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
} }
} else { } else {
......
...@@ -81,24 +81,24 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { ...@@ -81,24 +81,24 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
rel.tv_nsec = nanosecs; rel.tv_nsec = nanosecs;
GetSystemTimeAsFileTime(&ft_before); GetSystemTimeAsFileTime(&ft_before);
errno = 0; // errno = 0;
rc = sem_timedwait(&sem, pthread_win32_getabstime_np(&ts, &rel)); rc = sem_timedwait(sem, pthread_win32_getabstime_np(&ts, &rel));
/* This should have timed out */ /* This should have timed out */
assert(errno == ETIMEDOUT); // assert(errno == ETIMEDOUT);
assert(rc != 0); // assert(rc != 0);
GetSystemTimeAsFileTime(&ft_after); // GetSystemTimeAsFileTime(&ft_after);
// We specified a non-zero wait. Time must advance. // // We specified a non-zero wait. Time must advance.
if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime) // if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime)
{ // {
printf("nanoseconds: %d, rc: %d, errno: %d. before filetime: %d, %d; after filetime: %d, %d\n", // printf("nanoseconds: %d, rc: %d, errno: %d. before filetime: %d, %d; after filetime: %d, %d\n",
nanosecs, rc, errno, // nanosecs, rc, errno,
(int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime, // (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime,
(int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime); // (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime);
printf("time must advance during sem_timedwait."); // printf("time must advance during sem_timedwait.");
return 1; // return 1;
} // }
return 0; return rc;
} }
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
......
此差异已折叠。
...@@ -589,7 +589,10 @@ class TDDnodes: ...@@ -589,7 +589,10 @@ class TDDnodes:
psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'" psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID): while(processID):
killCmd = "kill -9 %s > /dev/null 2>&1" % processID if platform.system().lower() == 'windows':
killCmd = "kill -9 %s > nul 2>&1" % processID
else:
killCmd = "kill -9 %s > /dev/null 2>&1" % processID
os.system(killCmd) os.system(killCmd)
time.sleep(1) time.sleep(1)
processID = subprocess.check_output( processID = subprocess.check_output(
...@@ -599,7 +602,10 @@ class TDDnodes: ...@@ -599,7 +602,10 @@ class TDDnodes:
psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID): while(processID):
killCmd = "kill -TERM %s > /dev/null 2>&1" % processID if platform.system().lower() == 'windows':
killCmd = "kill -TERM %s > nul 2>&1" % processID
else:
killCmd = "kill -TERM %s > /dev/null 2>&1" % processID
os.system(killCmd) os.system(killCmd)
time.sleep(1) time.sleep(1)
processID = subprocess.check_output( processID = subprocess.check_output(
......
...@@ -2,20 +2,20 @@ ...@@ -2,20 +2,20 @@
#======================b1-start=============== #======================b1-start===============
# ---- user # ---- user
./test.sh -f tsim/user/basic1.sim #./test.sh -f tsim/user/basic1.sim
./test.sh -f tsim/user/pass_alter.sim #./test.sh -f tsim/user/pass_alter.sim
./test.sh -f tsim/user/pass_len.sim #./test.sh -f tsim/user/pass_len.sim
./test.sh -f tsim/user/user_len.sim #./test.sh -f tsim/user/user_len.sim
./test.sh -f tsim/user/privilege1.sim #./test.sh -f tsim/user/privilege1.sim
./test.sh -f tsim/user/privilege2.sim #./test.sh -f tsim/user/privilege2.sim#
# ---- db ## ---- db
./test.sh -f tsim/db/create_all_options.sim #./test.sh -f tsim/db/create_all_options.sim
./test.sh -f tsim/db/alter_option.sim #./test.sh -f tsim/db/alter_option.sim
./test.sh -f tsim/db/basic1.sim #./test.sh -f tsim/db/basic1.sim
./test.sh -f tsim/db/basic2.sim #./test.sh -f tsim/db/basic2.sim
./test.sh -f tsim/db/basic3.sim #./test.sh -f tsim/db/basic3.sim
./test.sh -f tsim/db/basic6.sim #./test.sh -f tsim/db/basic6.sim
./test.sh -f tsim/db/basic7.sim ./test.sh -f tsim/db/basic7.sim
./test.sh -f tsim/db/error1.sim ./test.sh -f tsim/db/error1.sim
./test.sh -f tsim/db/taosdlog.sim ./test.sh -f tsim/db/taosdlog.sim
......
...@@ -231,8 +231,10 @@ sql use test3; ...@@ -231,8 +231,10 @@ sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once watermark 1d into streamt3 as select _wstartts, min(b), a,c from t1 session(ts,10s); sql create stream streams3 trigger at_once watermark 1d into streamt3 as select _wstartts, min(b), a,c from t1 session(ts,10s);
sql create stream streams4 trigger at_once watermark 1d into streamt4 as select _wstartts, max(b), a,c from t1 session(ts,10s); sql create stream streams4 trigger at_once watermark 1d into streamt4 as select _wstartts, max(b), a,c from t1 session(ts,10s);
sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, max(b), a,c from t1 session(ts,10s); sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, top(b,3), a,c from t1 session(ts,10s);
sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, max(b), a,c from t1 session(ts,10s); sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s);
sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s);
sql insert into t1 values(1648791213001,1,1,1,1.0); sql insert into t1 values(1648791213001,1,1,1,1.0);
sql insert into t1 values(1648791213002,2,3,2,3.4); sql insert into t1 values(1648791213002,2,3,2,3.4);
sql insert into t1 values(1648791213003,4,9,3,4.8); sql insert into t1 values(1648791213003,4,9,3,4.8);
...@@ -279,4 +281,16 @@ if $rows == 0 then ...@@ -279,4 +281,16 @@ if $rows == 0 then
goto loop3 goto loop3
endi endi
sql select * from streamt7;
if $rows == 0 then
print ======$rows
goto loop3
endi
sql select * from streamt8;
if $rows == 0 then
print ======$rows
goto loop3
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
import time
from time import sleep from time import sleep
from util.log import * from util.log import *
...@@ -16,7 +17,7 @@ class TDTestCase: ...@@ -16,7 +17,7 @@ class TDTestCase:
self.ts = 1640966400000 # 2022-1-1 00:00:00.000 self.ts = 1640966400000 # 2022-1-1 00:00:00.000
def check_customize_param_ms(self): def check_customize_param_ms(self):
time_zone = os.popen('date "+%z"').read().strip() time_zone = time.strftime('%z')
tdSql.execute('create database db1 precision "ms"') tdSql.execute('create database db1 precision "ms"')
tdSql.execute('use db1') tdSql.execute('use db1')
tdSql.execute('create table if not exists ntb(ts timestamp, c1 int, c2 timestamp)') tdSql.execute('create table if not exists ntb(ts timestamp, c1 int, c2 timestamp)')
......
...@@ -83,6 +83,8 @@ class TDTestCase: ...@@ -83,6 +83,8 @@ class TDTestCase:
tdSql.query(f"select {col} {alias} from {table_expr} {pre_condition}") tdSql.query(f"select {col} {alias} from {table_expr} {pre_condition}")
pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'):
pre_data = np.array(pre_data, dtype = 'int64')
print("data is ", pre_data) print("data is ", pre_data)
pre_csum = np.cumsum(pre_data) pre_csum = np.cumsum(pre_data)
tdSql.query(self.csum_query_form( tdSql.query(self.csum_query_form(
...@@ -124,6 +126,8 @@ class TDTestCase: ...@@ -124,6 +126,8 @@ class TDTestCase:
tdSql.query(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}") tdSql.query(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}")
offset_val = condition.split("offset")[1].split(" ")[1] if "offset" in condition else 0 offset_val = condition.split("offset")[1].split(" ")[1] if "offset" in condition else 0
pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
if (platform.system().lower() == 'windows' and pre_result.dtype == 'int32'):
pre_result = np.array(pre_result, dtype = 'int64')
pre_csum = np.cumsum(pre_result)[offset_val:] pre_csum = np.cumsum(pre_result)[offset_val:]
tdSql.query(self.csum_query_form( tdSql.query(self.csum_query_form(
col=col, alias=alias, table_expr=table_expr, condition=condition col=col, alias=alias, table_expr=table_expr, condition=condition
......
...@@ -83,6 +83,8 @@ class TDTestCase: ...@@ -83,6 +83,8 @@ class TDTestCase:
tdSql.query(f"select {col} {alias} from {table_expr} {pre_condition}") tdSql.query(f"select {col} {alias} from {table_expr} {pre_condition}")
pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'):
pre_data = np.array(pre_data, dtype = 'int64')
pre_diff = np.diff(pre_data) pre_diff = np.diff(pre_data)
# trans precision for data # trans precision for data
tdSql.query(self.diff_query_form( tdSql.query(self.diff_query_form(
...@@ -127,6 +129,8 @@ class TDTestCase: ...@@ -127,6 +129,8 @@ class TDTestCase:
tdSql.query(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}") tdSql.query(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}")
offset_val = condition.split("offset")[1].split(" ")[1] if "offset" in condition else 0 offset_val = condition.split("offset")[1].split(" ")[1] if "offset" in condition else 0
pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
if (platform.system().lower() == 'windows' and pre_result.dtype == 'int32'):
pre_result = np.array(pre_result, dtype = 'int64')
pre_diff = np.diff(pre_result)[offset_val:] pre_diff = np.diff(pre_result)[offset_val:]
tdSql.query(self.diff_query_form( tdSql.query(self.diff_query_form(
col=col, alias=alias, table_expr=table_expr, condition=condition col=col, alias=alias, table_expr=table_expr, condition=condition
......
...@@ -245,6 +245,8 @@ class TDTestCase: ...@@ -245,6 +245,8 @@ class TDTestCase:
tdSql.query(f"select {col} {alias} from {table_expr} {pre_condition}") tdSql.query(f"select {col} {alias} from {table_expr} {pre_condition}")
pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'):
pre_data = np.array(pre_data, dtype = 'int64')
pre_mavg = np.convolve(pre_data, np.ones(k), "valid")/k pre_mavg = np.convolve(pre_data, np.ones(k), "valid")/k
tdSql.query(self.mavg_query_form( tdSql.query(self.mavg_query_form(
sel=sel, func=func, col=col, m_comm=m_comm, k=k, r_comm=r_comm, alias=alias, fr=fr, sel=sel, func=func, col=col, m_comm=m_comm, k=k, r_comm=r_comm, alias=alias, fr=fr,
...@@ -291,6 +293,8 @@ class TDTestCase: ...@@ -291,6 +293,8 @@ class TDTestCase:
# print(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}") # print(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}")
if not tdSql.queryResult: if not tdSql.queryResult:
pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
if (platform.system().lower() == 'windows' and pre_result.dtype == 'int32'):
pre_result = np.array(pre_result, dtype = 'int64')
pre_mavg = pre_mavg = np.convolve(pre_result, np.ones(k), "valid")[offset_val:]/k pre_mavg = pre_mavg = np.convolve(pre_result, np.ones(k), "valid")[offset_val:]/k
tdSql.query(self.mavg_query_form( tdSql.query(self.mavg_query_form(
......
...@@ -196,11 +196,13 @@ class TDTestCase: ...@@ -196,11 +196,13 @@ class TDTestCase:
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
if (platform.system().lower() == 'windows'): if (platform.system().lower() == 'windows'):
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &" shellCmd += "> nul 2>&1 &"
else: else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
...@@ -312,12 +314,13 @@ class TDTestCase: ...@@ -312,12 +314,13 @@ class TDTestCase:
pollDelay = 100 pollDelay = 100
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
if (platform.system().lower() == 'windows'): if (platform.system().lower() == 'windows'):
shellCmd += "> nul 2>&1 &" shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else: else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
...@@ -448,11 +451,13 @@ class TDTestCase: ...@@ -448,11 +451,13 @@ class TDTestCase:
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
if (platform.system().lower() == 'windows'): if (platform.system().lower() == 'windows'):
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &" shellCmd += "> nul 2>&1 &"
else: else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
......
...@@ -98,15 +98,19 @@ class TDTestCase: ...@@ -98,15 +98,19 @@ class TDTestCase:
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
shellCmd = 'nohup '
if valgrind == 1: if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath if (platform.system().lower() == 'windows'):
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += "> /dev/null 2>&1 &" shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
......
...@@ -81,15 +81,19 @@ class TDTestCase: ...@@ -81,15 +81,19 @@ class TDTestCase:
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
shellCmd = 'nohup '
if valgrind == 1: if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath if (platform.system().lower() == 'windows'):
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += "> /dev/null 2>&1 &" shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
......
...@@ -81,15 +81,19 @@ class TDTestCase: ...@@ -81,15 +81,19 @@ class TDTestCase:
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
shellCmd = 'nohup '
if valgrind == 1: if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath if (platform.system().lower() == 'windows'):
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += "> /dev/null 2>&1 &" shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
......
...@@ -81,15 +81,19 @@ class TDTestCase: ...@@ -81,15 +81,19 @@ class TDTestCase:
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
shellCmd = 'nohup '
if valgrind == 1: if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath if (platform.system().lower() == 'windows'):
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += "> /dev/null 2>&1 &" shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
...@@ -291,8 +295,12 @@ class TDTestCase: ...@@ -291,8 +295,12 @@ class TDTestCase:
for i in range(expectRows): for i in range(expectRows):
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
tdSql.query("select count(*) from %s.%s" %(parameterDict['dbName'], parameterDict['stbName']))
countOfStb = tdSql.getData(0,0)
print ("====total rows of stb: %d"%countOfStb)
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if totalConsumeRows != expectrowcnt: if totalConsumeRows < expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
tdLog.info("again start consume processer") tdLog.info("again start consume processer")
...@@ -361,7 +369,10 @@ class TDTestCase: ...@@ -361,7 +369,10 @@ class TDTestCase:
time.sleep(2) time.sleep(2)
tdLog.info("pkill consume processor") tdLog.info("pkill consume processor")
os.system('pkill tmq_sim') if (platform.system().lower() == 'windows'):
os.system("TASKKILL /F /IM tmq_sim.exe")
else:
os.system('pkill tmq_sim')
expectRows = 0 expectRows = 0
resultList = self.selectConsumeResult(expectRows) resultList = self.selectConsumeResult(expectRows)
...@@ -433,7 +444,10 @@ class TDTestCase: ...@@ -433,7 +444,10 @@ class TDTestCase:
time.sleep(5) time.sleep(5)
tdLog.info("pkill consume processor") tdLog.info("pkill consume processor")
os.system('pkill tmq_sim') if (platform.system().lower() == 'windows'):
os.system("TASKKILL /F /IM tmq_sim.exe")
else:
os.system('pkill tmq_sim')
expectRows = 0 expectRows = 0
resultList = self.selectConsumeResult(expectRows) resultList = self.selectConsumeResult(expectRows)
......
...@@ -93,15 +93,19 @@ class TDTestCase: ...@@ -93,15 +93,19 @@ class TDTestCase:
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
shellCmd = 'nohup '
if valgrind == 1: if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath if (platform.system().lower() == 'windows'):
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += "> /dev/null 2>&1 &" shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
......
...@@ -93,15 +93,19 @@ class TDTestCase: ...@@ -93,15 +93,19 @@ class TDTestCase:
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
shellCmd = 'nohup '
if valgrind == 1: if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath if (platform.system().lower() == 'windows'):
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += "> /dev/null 2>&1 &" shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
......
...@@ -93,15 +93,19 @@ class TDTestCase: ...@@ -93,15 +93,19 @@ class TDTestCase:
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
shellCmd = 'nohup '
if valgrind == 1: if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath if (platform.system().lower() == 'windows'):
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += "> /dev/null 2>&1 &" shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
......
...@@ -93,15 +93,19 @@ class TDTestCase: ...@@ -93,15 +93,19 @@ class TDTestCase:
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
shellCmd = 'nohup '
if valgrind == 1: if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath if (platform.system().lower() == 'windows'):
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += "> /dev/null 2>&1 &" shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
......
...@@ -93,15 +93,19 @@ class TDTestCase: ...@@ -93,15 +93,19 @@ class TDTestCase:
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
shellCmd = 'nohup '
if valgrind == 1: if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath if (platform.system().lower() == 'windows'):
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += "> /dev/null 2>&1 &" shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
......
...@@ -93,15 +93,19 @@ class TDTestCase: ...@@ -93,15 +93,19 @@ class TDTestCase:
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
shellCmd = 'nohup '
if valgrind == 1: if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath if (platform.system().lower() == 'windows'):
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += "> /dev/null 2>&1 &" shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
......
...@@ -6,7 +6,7 @@ python3 .\test.py -f 0-others\telemetry.py ...@@ -6,7 +6,7 @@ python3 .\test.py -f 0-others\telemetry.py
python3 .\test.py -f 0-others\taosdMonitor.py python3 .\test.py -f 0-others\taosdMonitor.py
python3 .\test.py -f 0-others\udfTest.py python3 .\test.py -f 0-others\udfTest.py
python3 .\test.py -f 0-others\udf_create.py python3 .\test.py -f 0-others\udf_create.py
@REM python3 .\test.py -f 0-others\udf_restart_taosd.py python3 .\test.py -f 0-others\udf_restart_taosd.py
python3 .\test.py -f 0-others\cachelast.py python3 .\test.py -f 0-others\cachelast.py
python3 .\test.py -f 0-others\user_control.py python3 .\test.py -f 0-others\user_control.py
...@@ -21,7 +21,7 @@ python3 .\test.py -f 1-insert\alter_table.py ...@@ -21,7 +21,7 @@ python3 .\test.py -f 1-insert\alter_table.py
python3 .\test.py -f 2-query\between.py python3 .\test.py -f 2-query\between.py
python3 .\test.py -f 2-query\distinct.py python3 .\test.py -f 2-query\distinct.py
python3 .\test.py -f 2-query\varchar.py python3 .\test.py -f 2-query\varchar.py
@REM python3 .\test.py -f 2-query\ltrim.py python3 .\test.py -f 2-query\ltrim.py
python3 .\test.py -f 2-query\rtrim.py python3 .\test.py -f 2-query\rtrim.py
python3 .\test.py -f 2-query\length.py python3 .\test.py -f 2-query\length.py
python3 .\test.py -f 2-query\char_length.py python3 .\test.py -f 2-query\char_length.py
...@@ -32,12 +32,12 @@ python3 .\test.py -f 2-query\join2.py ...@@ -32,12 +32,12 @@ python3 .\test.py -f 2-query\join2.py
python3 .\test.py -f 2-query\cast.py python3 .\test.py -f 2-query\cast.py
python3 .\test.py -f 2-query\union.py python3 .\test.py -f 2-query\union.py
python3 .\test.py -f 2-query\union1.py python3 .\test.py -f 2-query\union1.py
@REM python3 .\test.py -f 2-query\concat.py python3 .\test.py -f 2-query\concat.py
python3 .\test.py -f 2-query\concat2.py python3 .\test.py -f 2-query\concat2.py
python3 .\test.py -f 2-query\concat_ws.py python3 .\test.py -f 2-query\concat_ws.py
python3 .\test.py -f 2-query\concat_ws2.py python3 .\test.py -f 2-query\concat_ws2.py
@REM python3 .\test.py -f 2-query\check_tsdb.py python3 .\test.py -f 2-query\check_tsdb.py
@REM python3 .\test.py -f 2-query\spread.py python3 .\test.py -f 2-query\spread.py
@REM python3 .\test.py -f 2-query\hyperloglog.py @REM python3 .\test.py -f 2-query\hyperloglog.py
python3 .\test.py -f 2-query\timezone.py python3 .\test.py -f 2-query\timezone.py
...@@ -71,7 +71,7 @@ python3 .\test.py -f 2-query\tan.py ...@@ -71,7 +71,7 @@ python3 .\test.py -f 2-query\tan.py
python3 .\test.py -f 2-query\arcsin.py python3 .\test.py -f 2-query\arcsin.py
python3 .\test.py -f 2-query\arccos.py python3 .\test.py -f 2-query\arccos.py
python3 .\test.py -f 2-query\arctan.py python3 .\test.py -f 2-query\arctan.py
@REM python3 .\test.py -f 2-query\query_cols_tags_and_or.py python3 .\test.py -f 2-query\query_cols_tags_and_or.py
@REM # python3 .\test.py -f 2-query\nestedQuery.py @REM # python3 .\test.py -f 2-query\nestedQuery.py
@REM # TD-15983 subquery output duplicate name column. @REM # TD-15983 subquery output duplicate name column.
@REM # Please Xiangyang Guo modify the following script @REM # Please Xiangyang Guo modify the following script
...@@ -79,24 +79,24 @@ python3 .\test.py -f 2-query\arctan.py ...@@ -79,24 +79,24 @@ python3 .\test.py -f 2-query\arctan.py
python3 .\test.py -f 2-query\avg.py python3 .\test.py -f 2-query\avg.py
python3 .\test.py -f 2-query\elapsed.py python3 .\test.py -f 2-query\elapsed.py
@REM python3 .\test.py -f 2-query\csum.py python3 .\test.py -f 2-query\csum.py
python3 .\test.py -f 2-query\mavg.py python3 .\test.py -f 2-query\mavg.py
python3 .\test.py -f 2-query\diff.py python3 .\test.py -f 2-query\diff.py
python3 .\test.py -f 2-query\sample.py python3 .\test.py -f 2-query\sample.py
@REM python3 .\test.py -f 2-query\function_diff.py python3 .\test.py -f 2-query\function_diff.py
python3 .\test.py -f 2-query\unique.py python3 .\test.py -f 2-query\unique.py
python3 .\test.py -f 2-query\stateduration.py python3 .\test.py -f 2-query\stateduration.py
python3 .\test.py -f 2-query\function_stateduration.py python3 .\test.py -f 2-query\function_stateduration.py
python3 .\test.py -f 2-query\statecount.py python3 .\test.py -f 2-query\statecount.py
@REM python3 .\test.py -f 7-tmq\basic5.py python3 .\test.py -f 7-tmq\basic5.py
@REM python3 .\test.py -f 7-tmq\subscribeDb.py python3 .\test.py -f 7-tmq\subscribeDb.py
@REM python3 .\test.py -f 7-tmq\subscribeDb0.py python3 .\test.py -f 7-tmq\subscribeDb0.py
@REM python3 .\test.py -f 7-tmq\subscribeDb1.py python3 .\test.py -f 7-tmq\subscribeDb1.py
@REM python3 .\test.py -f 7-tmq\subscribeStb.py python3 .\test.py -f 7-tmq\subscribeStb.py
@REM python3 .\test.py -f 7-tmq\subscribeStb0.py python3 .\test.py -f 7-tmq\subscribeStb0.py
@REM python3 .\test.py -f 7-tmq\subscribeStb1.py python3 .\test.py -f 7-tmq\subscribeStb1.py
@REM python3 .\test.py -f 7-tmq\subscribeStb2.py python3 .\test.py -f 7-tmq\subscribeStb2.py
@REM python3 .\test.py -f 7-tmq\subscribeStb3.py python3 .\test.py -f 7-tmq\subscribeStb3.py
@REM python3 .\test.py -f 7-tmq\subscribeStb4.py python3 .\test.py -f 7-tmq\subscribeStb4.py
@REM python3 .\test.py -f 7-tmq\db.py python3 .\test.py -f 7-tmq\db.py
\ No newline at end of file \ No newline at end of file
...@@ -22,10 +22,11 @@ echo Windows Taosd Test ...@@ -22,10 +22,11 @@ echo Windows Taosd Test
for /F "usebackq tokens=*" %%i in (simpletest.bat) do ( for /F "usebackq tokens=*" %%i in (simpletest.bat) do (
for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" ( for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" (
set /a a+=1 set /a a+=1
set timeNow=!time!
echo !a! Processing %%i echo !a! Processing %%i
call :GetTimeSeconds !time! call :GetTimeSeconds !timeNow!
set time1=!_timeTemp! set time1=!_timeTemp!
echo Start at !time! echo Start at !timeNow!
call %%i ARG1 > result_!a!.txt 2>error_!a!.txt call %%i ARG1 > result_!a!.txt 2>error_!a!.txt
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && echo result: && cat result_!a!.txt && echo error: && cat error_!a!.txt && exit 8 ) else ( call :colorEcho 0a "Success" &echo. ) if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && echo result: && cat result_!a!.txt && echo error: && cat error_!a!.txt && exit 8 ) else ( call :colorEcho 0a "Success" &echo. )
) )
...@@ -45,10 +46,11 @@ for /F "usebackq tokens=*" %%i in (simpletest.bat) do ( ...@@ -45,10 +46,11 @@ for /F "usebackq tokens=*" %%i in (simpletest.bat) do (
exit exit
:colorEcho :colorEcho
call :GetTimeSeconds %time% set timeNow=%time%
call :GetTimeSeconds %timeNow%
set time2=%_timeTemp% set time2=%_timeTemp%
set /a interTime=%time2% - %time1% set /a interTime=%time2% - %time1%
echo End at %time% , cast %interTime%s echo End at %timeNow% , cast %interTime%s
echo off echo off
<nul set /p ".=%DEL%" > "%~2" <nul set /p ".=%DEL%" > "%~2"
findstr /v /a:%1 /R "^$" "%~2" nul findstr /v /a:%1 /R "^$" "%~2" nul
...@@ -62,14 +64,14 @@ set tt=%tt::= % ...@@ -62,14 +64,14 @@ set tt=%tt::= %
set index=1 set index=1
for %%a in (%tt%) do ( for %%a in (%tt%) do (
if !index! EQU 1 ( if !index! EQU 1 (
set hh=%%a set /a hh=%%a
)^ )^
else if !index! EQU 2 ( else if !index! EQU 2 (
set mm=%%a set /a mm=%%a
)^ )^
else if !index! EQU 3 ( else if !index! EQU 3 (
set ss=%%a set /a ss=%%a
) )
set /a index=index+1 set /a index=index+1
) )
......
...@@ -128,7 +128,12 @@ void initLogFile() { ...@@ -128,7 +128,12 @@ void initLogFile() {
sprintf(filename,"%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString)); sprintf(filename,"%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString));
//sprintf(filename, "%s/../log/tmqlog.txt", configDir); //sprintf(filename, "%s/../log/tmqlog.txt", configDir);
#ifdef WINDOWS
for (int i = 2; i < sizeof(filename); i++) {
if (filename[i] == ':') filename[i] = '-';
if (filename[i] == '\0') break;
}
#endif
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (NULL == pFile) { if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", filename); fprintf(stderr, "Failed to open %s for save result\n", filename);
...@@ -308,8 +313,10 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) ...@@ -308,8 +313,10 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
const char* tbName = tmq_get_table_name(msg);
if (0 != g_stConfInfo.showRowFlag) { if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName:"null table"), totalRows, buf);
if (0 != g_stConfInfo.saveRowFlag) { if (0 != g_stConfInfo.saveRowFlag) {
saveConsumeContentToTbl(pInfo, buf); saveConsumeContentToTbl(pInfo, buf);
} }
...@@ -356,6 +363,8 @@ void build_consumer(SThreadInfo* pInfo) { ...@@ -356,6 +363,8 @@ void build_consumer(SThreadInfo* pInfo) {
tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]); tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
} }
tmq_conf_set(conf, "msg.with.table.name", "true");
// tmq_conf_set(conf, "client.id", "c-001"); // tmq_conf_set(conf, "client.id", "c-001");
// tmq_conf_set(conf, "enable.auto.commit", "true"); // tmq_conf_set(conf, "enable.auto.commit", "true");
...@@ -528,6 +537,12 @@ void* consumeThreadFunc(void* param) { ...@@ -528,6 +537,12 @@ void* consumeThreadFunc(void* param) {
// save consume result into consumeresult table // save consume result into consumeresult table
saveConsumeResult(pInfo); saveConsumeResult(pInfo);
// save rows from per vgroup
taosFprintfFile(g_fp, "======== consumerId: %d, consume rows from per vgroups ========\n", pInfo->consumerId);
for (int32_t i = 0; i < pInfo->numOfVgroups; i++) {
taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]);
}
return NULL; return NULL;
} }
......
Subproject commit 717f5aaa5f0a1b4d92bb2ae68858fec554fb5eda Subproject commit 1446be95164e6cda3ff88270f7cfa50d8430503f
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册