diff --git a/src/kit/taosdemox/subscribe.json b/src/kit/taosdemox/subscribe.json index 6dfacdd6ed112b398cf38731147cafc02879efe2..f70b1213a884af7d593b3d7366268ce03de1d239 100644 --- a/src/kit/taosdemox/subscribe.json +++ b/src/kit/taosdemox/subscribe.json @@ -5,13 +5,13 @@ "port": 6030, "user": "root", "password": "taosdata", - "databases": "db01", - "super_table_query": + "databases": "db", + "specified_table_query": {"concurrent":1, "mode":"sync", "interval":5000, "restart":"yes", "keepProgress":"yes", - "sqls": [{"sql": "select avg(c1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}] + "sqls": [{"sql": "select avg(col1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}] }, - "sub_table_query": - {"stblname": "stb01", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes", - "sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}] - } + "super_table_query": + {"stblname": "stb", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes", + "sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}] + } } diff --git a/src/kit/taosdemox/taosdemox.c b/src/kit/taosdemox/taosdemox.c index 0e2ec6d7ae82feabb6a9fa15b5425067776b3292..5c187030c521193e024062763c342cd0d928133c 100644 --- a/src/kit/taosdemox/taosdemox.c +++ b/src/kit/taosdemox/taosdemox.c @@ -4269,23 +4269,24 @@ void *subSubscribeProcess(void *sarg) { } while (0); // start loop to consume result + TAOS_RES* res = NULL; while (1) { for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) { if (1 == g_queryInfo.subQueryInfo.subscribeMode) { continue; } - TAOS_RES* res = taos_consume(g_queryInfo.subQueryInfo.tsub[i]); + res = taos_consume(g_queryInfo.subQueryInfo.tsub[i]); if (res) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.subQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[i], winfo->threadID); } getResult(res, tmpFile); - taos_free_result(res); } } } + taos_free_result(res); for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) { taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i], g_queryInfo.subQueryInfo.subscribeKeepProgress); @@ -4328,23 +4329,24 @@ void *superSubscribeProcess(void *sarg) { } while (0); // start loop to consume result + TAOS_RES* res = NULL; while (1) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { if (1 == g_queryInfo.superQueryInfo.subscribeMode) { continue; } - TAOS_RES* res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]); + res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]); if (res) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.superQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[i], winfo->threadID); } getResult(res, tmpFile); - taos_free_result(res); } } } + taos_free_result(res); for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress); diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index a07607e615a46c34c1a570cacc42c8f36769b754..d29a738a10f6734ee62b45867582ed75c667f78f 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -259,7 +259,7 @@ typedef struct { int16_t bytes[TSDB_MAX_COLUMNS]; int32_t numOfReads; int8_t maxReplica; - int8_t reserved0[0]; + int8_t reserved0[1]; uint16_t payloadLen; char payload[]; } SShowObj; diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index e4df562d81c7ef91fe1ff62eba6611d76e0a3ff1..e4e4a7a054ff194a578dbaad75b2ae17bfe6f1ad 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -60,7 +60,7 @@ typedef struct SSdbRow { int32_t (*fpReq)(SMnodeMsg *pMsg); int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code); char reserveForSync[24]; - SWalHead pHead[]; + SWalHead pHead; } SSdbRow; typedef struct { diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 9d2bfe0ce15f2579bdffd95095ca235bdadff286..ae495108b3a3d0afd479057d9828ae4c69d4537f 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -274,7 +274,7 @@ static int32_t sdbGetSyncVersion(int32_t vgId, uint64_t *fver, uint64_t *vver) { // failed to forward, need revert insert static void sdbHandleFailedConfirm(SSdbRow *pRow) { - SWalHead *pHead = pRow->pHead; + SWalHead *pHead = &pRow->pHead; int32_t action = pHead->msgType % 10; sdbError("vgId:1, row:%p:%s hver:%" PRIu64 " action:%s, failed to foward since %s", pRow->pObj, @@ -1012,7 +1012,7 @@ static void sdbFreeQueue() { } static int32_t sdbWriteToQueue(SSdbRow *pRow, int32_t qtype) { - SWalHead *pHead = pRow->pHead; + SWalHead *pHead = &pRow->pHead; if (pHead->len > TSDB_MAX_WAL_SIZE) { sdbError("vgId:1, wal len:%d exceeds limit, hver:%" PRIu64, pHead->len, pHead->version); @@ -1051,8 +1051,8 @@ static int32_t sdbWriteFwdToQueue(int32_t vgId, void *wparam, int32_t qtype, voi return TSDB_CODE_VND_OUT_OF_MEMORY; } - memcpy(pRow->pHead, pHead, sizeof(SWalHead) + pHead->len); - pRow->rowData = pRow->pHead->cont; + memcpy(&pRow->pHead, pHead, sizeof(SWalHead) + pHead->len); + pRow->rowData = pRow->pHead.cont; int32_t code = sdbWriteToQueue(pRow, qtype); if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) code = 0; @@ -1073,7 +1073,7 @@ static int32_t sdbWriteRowToQueue(SSdbRow *pInputRow, int32_t action) { memcpy(pRow, pInputRow, sizeof(SSdbRow)); pRow->processedCount = 1; - SWalHead *pHead = pRow->pHead; + SWalHead *pHead = &pRow->pHead; pRow->rowData = pHead->cont; (*pTable->fpEncode)(pRow); @@ -1103,9 +1103,9 @@ static void *sdbWorkerFp(void *pWorker) { for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow); sdbTrace("vgId:1, msg:%p, row:%p hver:%" PRIu64 ", will be processed in sdb queue", pRow->pMsg, pRow->pObj, - pRow->pHead->version); + pRow->pHead.version); - pRow->code = sdbProcessWrite((qtype == TAOS_QTYPE_RPC) ? pRow : NULL, pRow->pHead, qtype, NULL); + pRow->code = sdbProcessWrite((qtype == TAOS_QTYPE_RPC) ? pRow : NULL, &pRow->pHead, qtype, NULL); if (pRow->code > 0) pRow->code = 0; sdbTrace("vgId:1, msg:%p is processed in sdb queue, code:%x", pRow->pMsg, pRow->code); @@ -1122,7 +1122,7 @@ static void *sdbWorkerFp(void *pWorker) { sdbConfirmForward(1, pRow, pRow->code); } else { if (qtype == TAOS_QTYPE_FWD) { - syncConfirmForward(tsSdbMgmt.sync, pRow->pHead->version, pRow->code); + syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code); } sdbFreeFromQueue(pRow); }