diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index eed6391d5c97af340b5827166fe4c00ba23cd21a..1e51288e026d68c1aeb6d3681559bb7a24da6b42 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -892,7 +892,7 @@ SDataType createDataType(uint8_t type) { } SDataType createVarLenDataType(uint8_t type, const SToken* pLen) { - SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = tDataTypes[type].bytes }; + SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = strtol(pLen->z, NULL, 10) }; return dt; } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 803d8e00d8c217c7e404536a482e960b30509866..1111ff2887b52c7b58701b02d6a60ab1d6a42b89 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -28,6 +28,13 @@ pSql += index; \ } while (0) +#define NEXT_TOKEN_WITH_PREV(pSql, sToken) \ + do { \ + int32_t index = 0; \ + sToken = tStrGetToken(pSql, &index, true); \ + pSql += index; \ + } while (0) + #define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \ do { \ sToken = tStrGetToken(pSql, &index, false); \ @@ -352,7 +359,7 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time sToken = tStrGetToken(pTokenEnd, &index, false); pTokenEnd += index; - if (sToken.type == TK_MINUS || sToken.type == TK_NK_PLUS) { + if (sToken.type == TK_NK_MINUS || sToken.type == TK_NK_PLUS) { index = 0; valueToken = tStrGetToken(pTokenEnd, &index, false); pTokenEnd += index; @@ -748,7 +755,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, SToken sToken; char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" for (int i = 0; i < pCxt->tags.numOfBound; ++i) { - NEXT_TOKEN(pCxt->pSql, sToken); + NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken); SSchema* pSchema = &pTagsSchema[pCxt->tags.boundedColumns[i]]; param.schema = pSchema; CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg)); @@ -814,7 +821,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, SToken sToken = {0}; // 1. set the parsed value from sql string for (int i = 0; i < spd->numOfBound; ++i) { - NEXT_TOKEN(pCxt->pSql, sToken); + NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken); SSchema *pSchema = &schema[spd->boundedColumns[i] - 1]; param.schema = pSchema; getMemRowAppendInfo(schema, pBuilder->rowType, spd, i, ¶m.toffset, ¶m.colIdx); diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 3cd4549ad068140416ad0f33c5284526a25d8581..7f161570905de9db01c6941aff38dd53b0345931 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -320,7 +320,7 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) { *tokenId = TK_NK_COMMENT; return i; } - *tokenId = TK_MINUS; + *tokenId = TK_NK_MINUS; return 1; } case '(': { @@ -674,7 +674,7 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) { } else { // support parse the -/+number format - if ((isPrevOptr) && (t0.type == TK_MINUS || t0.type == TK_NK_PLUS)) { + if ((isPrevOptr) && (t0.type == TK_NK_MINUS || t0.type == TK_NK_PLUS)) { len = tGetToken(&str[*i + t0.n], &type); if (type == TK_NK_INTEGER || type == TK_NK_FLOAT) { t0.type = type; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 70652901ef1a83c27e6d69fa5548e5a3639d1eb1..f20bc5f2e6d113b56c4532d3175ff8c2ef6b8cdb 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -873,12 +873,22 @@ static int32_t translateAlterDatabase(STranslateContext* pCxt, SAlterDatabaseStm return TSDB_CODE_SUCCESS; } +static int32_t calcTypeBytes(SDataType dt) { + if (TSDB_DATA_TYPE_BINARY == dt.type) { + return dt.bytes + VARSTR_HEADER_SIZE; + } else if (TSDB_DATA_TYPE_NCHAR == dt.type) { + return dt.bytes * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; + } else { + return dt.bytes; + } +} + static int32_t columnNodeToField(SNodeList* pList, SArray** pArray) { *pArray = taosArrayInit(LIST_LENGTH(pList), sizeof(SField)); SNode* pNode; FOREACH(pNode, pList) { SColumnDefNode* pCol = (SColumnDefNode*)pNode; - SField field = { .type = pCol->dataType.type, .bytes = pCol->dataType.bytes }; + SField field = { .type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType) }; strcpy(field.name, pCol->colName); taosArrayPush(*pArray, &field); } diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 66c7c8620df10728ef54c7ccf08c5e854d493f3b..7940d7f436335c10ec0745b98e96ab3142186810 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -31,10 +31,10 @@ extern "C" { #define TIMER_MAX_MS 0x7FFFFFFF #define ENV_TICK_TIMER_MS 1000 #define PING_TIMER_MS 1000 -#define ELECT_TIMER_MS_MIN 1500 -#define ELECT_TIMER_MS_MAX 3000 +#define ELECT_TIMER_MS_MIN 150 +#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) -#define HEARTBEAT_TIMER_MS 300 +#define HEARTBEAT_TIMER_MS 30 #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index eaf4d56f677dad7203f753b3541561a4f7eba912..cfa87048a713d735bb0c8f09ee08a7fcedf590f3 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -160,6 +160,11 @@ typedef struct SSyncNode { SSyncLogStore* pLogStore; SyncIndex commitIndex; + // timer ms init + int32_t pingBaseLine; + int32_t electBaseLine; + int32_t hbBaseLine; + // ping timer tmr_h pPingTimer; int32_t pingTimerMS; diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 1b702c252867dc8a909048d2bea2c960989f40b4..a4bb11afc63e723b29b2de438081a928f4a8366c 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -44,7 +44,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); // ---- misc ---- int32_t syncUtilRand(int32_t max); -int32_t syncUtilElectRandomMS(); +int32_t syncUtilElectRandomMS(int32_t min, int32_t max); int32_t syncUtilQuorum(int32_t replicaNum); cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p); cJSON* syncUtilRaftId2Json(const SRaftId* p); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index facb0180f9b3e8a2ebf70b72d6e44a5ac0bd9a9a..b92317ef3f75621a665b2fbb7ec8618f75e30a2b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -242,9 +242,14 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { assert(pSyncNode->pLogStore != NULL); pSyncNode->commitIndex = SYNC_INDEX_INVALID; + // timer ms init + pSyncNode->pingBaseLine = PING_TIMER_MS; + pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN; + pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS; + // init ping timer pSyncNode->pPingTimer = NULL; - pSyncNode->pingTimerMS = PING_TIMER_MS; + pSyncNode->pingTimerMS = pSyncNode->pingBaseLine; atomic_store_64(&pSyncNode->pingTimerLogicClock, 0); atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0); pSyncNode->FpPingTimerCB = syncNodeEqPingTimer; @@ -252,7 +257,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init elect timer pSyncNode->pElectTimer = NULL; - pSyncNode->electTimerMS = syncUtilElectRandomMS(); + pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); atomic_store_64(&pSyncNode->electTimerLogicClock, 0); atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0); pSyncNode->FpElectTimerCB = syncNodeEqElectTimer; @@ -260,7 +265,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init heartbeat timer pSyncNode->pHeartbeatTimer = NULL; - pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS; + pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine; atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0); atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0); pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer; @@ -394,7 +399,7 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - int32_t electMS = syncUtilElectRandomMS(); + int32_t electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); ret = syncNodeRestartElectTimer(pSyncNode, electMS); return ret; } @@ -763,7 +768,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { syncTimeoutDestroy(pSyncMsg); // reset timer ms - pSyncNode->electTimerMS = syncUtilElectRandomMS(); + pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 9486405331bd3d5352eb8b1ce73cbff950b3feaa..8fc17ccb51ddc56623d47b35ddd73df9b655418a 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -96,7 +96,10 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { int32_t syncUtilRand(int32_t max) { return taosRand() % max; } -int32_t syncUtilElectRandomMS() { return ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); } +int32_t syncUtilElectRandomMS(int32_t min, int32_t max) { + assert(min > 0 && max > 0 && max >= min); + return min + syncUtilRand(max - min); +} int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; } diff --git a/source/libs/sync/test/syncUtilTest.cpp b/source/libs/sync/test/syncUtilTest.cpp index 663db3a7b301eb8e020ab5f2f95066ca06d8a308..ae2afd2f536458ef26b5fe7731ba06e403f4af5f 100644 --- a/source/libs/sync/test/syncUtilTest.cpp +++ b/source/libs/sync/test/syncUtilTest.cpp @@ -16,7 +16,7 @@ void logTest() { void electRandomMSTest() { for (int i = 0; i < 10; ++i) { - int32_t ms = syncUtilElectRandomMS(); + int32_t ms = syncUtilElectRandomMS(150, 300); printf("syncUtilElectRandomMS: %d \n", ms); } } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 0c2b4fe1f4dad86661e70b06282b3cd8e9db86a9..cc0576e1f36f565a5acb0c09d096f767fc9806ca 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -21,4 +21,7 @@ # ---- query ./test.sh -f tsim/query/interval.sim + +# ---- tmq +./test.sh -f tsim/tmq/basic.sim #======================b1-end=============== diff --git a/tests/script/tsim/insert/basic0.sim b/tests/script/tsim/insert/basic0.sim index 6c24662be70706db3e0fba36e9d5368a30749bfe..b83cc224f8af8731b3b91d0d1b3163193622e51c 100644 --- a/tests/script/tsim/insert/basic0.sim +++ b/tests/script/tsim/insert/basic0.sim @@ -42,9 +42,8 @@ sql insert into ct1 values(now+0s, 10, 2.0, 3.0) sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, 12, 2.2, 3.2)(now+3s, 13, 2.3, 3.3) sql insert into ct2 values(now+0s, 10, 2.0, 3.0) sql insert into ct2 values(now+1s, 11, 2.1, 3.1)(now+2s, 12, 2.2, 3.2)(now+3s, 13, 2.3, 3.3) -# after fix bug, modify sql_error to sql -sql_error insert into ct1 values(now+4s, -14, -2.4, -3.4) ct2 values(now+4s, -14, -2.4, -3.4) -sql_error insert into ct1 values(now+5s, -15, -2.5, -3.5)(now+6s, -16, -2.6, -3.6) ct2 values(now+5s, -15, -2.5, -3.5)(now+6s, -16, -2.6, -3.6) +sql insert into ct1 values(now+4s, -14, -2.4, -3.4) ct2 values(now+4s, -14, -2.4, -3.4) +sql insert into ct1 values(now+5s, -15, -2.5, -3.5)(now+6s, -16, -2.6, -3.6) ct2 values(now+5s, -15, -2.5, -3.5)(now+6s, -16, -2.6, -3.6) sql insert into ct3 values('2021-01-01 00:00:00.000', 10, 2.0, 3.0) @@ -52,7 +51,7 @@ sql insert into ct3 values('2021-01-01 00:00:00.000', 10, 2.0, 3.0) #=================================================================== print =============== query data from child table sql select * from ct1 -if $rows != 4 then # after fix bug, modify 4 to 7 +if $rows != 7 then return -1 endi if $data01 != 10 then @@ -82,23 +81,23 @@ if $rows != 1 then endi print $data00 $data01 $data02 -if $data00 != 4 then +if $data00 != 7 then return -1 endi print =============== select count(column) from child table sql select count(ts), count(c1), count(c2), count(c3) from ct1 print $data00 $data01 $data02 $data03 -if $data00 != 4 then +if $data00 != 7 then return -1 endi -if $data01 != 4 then +if $data01 != 7 then return -1 endi -if $data02 != 4 then +if $data02 != 7 then return -1 endi -if $data03 != 4 then +if $data03 != 7 then return -1 endi @@ -112,13 +111,13 @@ print $data00 $data01 $data02 $data03 if $rows != 1 then return -1 endi -if $data00 != 10 then +if $data00 != -16 then return -1 endi -if $data01 != 2.00000 then +if $data01 != -2.60000 then return -1 endi -if $data02 != 3.000000000 then +if $data02 != -3.600000000 then return -1 endi @@ -144,13 +143,13 @@ print $data00 $data01 $data02 $data03 if $rows != 1 then return -1 endi -if $data00 != 46 then +if $data00 != 1 then return -1 endi -if $data01 != 8.599999905 then +if $data01 != 1.099999905 then return -1 endi -if $data02 != 12.600000000 then +if $data02 != 2.100000000 then return -1 endi @@ -232,7 +231,7 @@ system sh/exec.sh -n dnode1 -s start sleep 2000 sql select * from ct1 -if $rows != 4 then # after fix bug, modify 4 to 7 +if $rows != 7 then return -1 endi if $data01 != 10 then @@ -262,23 +261,23 @@ if $rows != 1 then endi print $data00 $data01 $data02 -if $data00 != 4 then +if $data00 != 7 then return -1 endi print =============== select count(column) from child table sql select count(ts), count(c1), count(c2), count(c3) from ct1 print $data00 $data01 $data02 $data03 -if $data00 != 4 then +if $data00 != 7 then return -1 endi -if $data01 != 4 then +if $data01 != 7 then return -1 endi -if $data02 != 4 then +if $data02 != 7 then return -1 endi -if $data03 != 4 then +if $data03 != 7 then return -1 endi @@ -292,13 +291,13 @@ print $data00 $data01 $data02 $data03 if $rows != 1 then return -1 endi -if $data00 != 10 then +if $data00 != -16 then return -1 endi -if $data01 != 2.00000 then +if $data01 != -2.60000 then return -1 endi -if $data02 != 3.000000000 then +if $data02 != -3.600000000 then return -1 endi @@ -324,13 +323,13 @@ print $data00 $data01 $data02 $data03 if $rows != 1 then return -1 endi -if $data00 != 46 then +if $data00 != 1 then return -1 endi -if $data01 != 8.599999905 then +if $data01 != 1.099999905 then return -1 endi -if $data02 != 12.600000000 then +if $data02 != 2.100000000 then return -1 endi diff --git a/tests/script/tsim/tmq/basic.sim b/tests/script/tsim/tmq/basic.sim new file mode 100644 index 0000000000000000000000000000000000000000..a091e45061398eee78b7ef939439118739d2f81f --- /dev/null +++ b/tests/script/tsim/tmq/basic.sim @@ -0,0 +1,75 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c wal -v 1 +system sh/exec.sh -n dnode1 -s start +sleep 500 +sql connect + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 100 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi + +#root@trd02 /data2/dnode $ tmq_demo --help +#Used to tmq_demo +# -c Configuration directory, default is +# -d The name of the database to be created, default is tmqdb +# -s The name of the super table to be created, default is stb +# -f The file of result, default is ./tmqResult.txt +# -w The path of vnode of wal, default is /data2/dnode/data/vnodes/vnode2/wal +# -t numOfThreads, default is 1 +# -n numOfTables, default is 1 +# -v numOfVgroups, default is 1 +# -a runMode, default is 0 +# -l numOfColumn, default is 1 +# -q ratio, default is 1.000000 +# -b batchNumOfRow, default is 1 +# -r totalRowsOfPerTbl, default is 10000 +# -m startTimestamp, default is 1640966400000 [2022-01-01 00:00:00] +# -g showMsgFlag, default is 0 +# +#system_content ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg +system ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg +print result-> $system_content + +sql show databases +print ===> $rows $data00 $data01 $data02 $data03 +if $rows != 1 then + return -1 +endi +if $data00 != tmqdb then + return -1 +endi + +sql use tmqdb +sql show tables +print ===> $rows $data00 $data01 $data02 $data03 +if $rows != 1 then + return -1 +endi +if $data00 != stb0 then + return -1 +endi + +sql select count(*) from stb0 +print ===> $rows $data00 $data01 $data02 $data03 +if $rows != 1 then + return -1 +endi +if $data00 != 10000 then + return -1 +endi +#system sh/exec.sh -n dnode1 -s stop -x SIGINT