未验证 提交 b05c263e 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11796 from taosdata/feature/tq

enh(wal): skip read for specific msg
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
#include <unistd.h>
#include "taos.h" #include "taos.h"
static int running = 1; static int running = 1;
...@@ -47,6 +48,7 @@ int32_t init_env() { ...@@ -47,6 +48,7 @@ int32_t init_env() {
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
sleep(1);
pRes = taos_query(pConn, "use abc1"); pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
...@@ -58,6 +60,7 @@ int32_t init_env() { ...@@ -58,6 +60,7 @@ int32_t init_env() {
pRes = pRes =
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"); taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
assert(0);
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
...@@ -265,10 +268,11 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -265,10 +268,11 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
} }
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
int code;
if (argc > 1) { if (argc > 1) {
printf("env init\n"); printf("env init\n");
code = init_env(); if (init_env() < 0) {
return -1;
}
create_topic(); create_topic();
} }
tmq_t* tmq = build_consumer(); tmq_t* tmq = build_consumer();
......
...@@ -1314,7 +1314,6 @@ typedef struct { ...@@ -1314,7 +1314,6 @@ typedef struct {
} SMqConsumerLostMsg; } SMqConsumerLostMsg;
typedef struct { typedef struct {
int32_t topicNum;
int64_t consumerId; int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
SArray* topicNames; // SArray<char*> SArray* topicNames; // SArray<char*>
...@@ -1322,22 +1321,27 @@ typedef struct { ...@@ -1322,22 +1321,27 @@ typedef struct {
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pReq->topicNum);
tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->cgroup); tlen += taosEncodeString(buf, pReq->cgroup);
for (int32_t i = 0; i < pReq->topicNum; i++) { int32_t topicNum = taosArrayGetSize(pReq->topicNames);
tlen += taosEncodeFixedI32(buf, topicNum);
for (int32_t i = 0; i < topicNum; i++) {
tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i)); tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
} }
return tlen; return tlen;
} }
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
buf = taosDecodeFixedI32(buf, &pReq->topicNum);
buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeStringTo(buf, pReq->cgroup);
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
for (int32_t i = 0; i < pReq->topicNum; i++) { int32_t topicNum;
buf = taosDecodeFixedI32(buf, &topicNum);
pReq->topicNames = taosArrayInit(topicNum, sizeof(void*));
for (int32_t i = 0; i < topicNum; i++) {
char* name; char* name;
buf = taosDecodeString(buf, &name); buf = taosDecodeString(buf, &name);
taosArrayPush(pReq->topicNames, &name); taosArrayPush(pReq->topicNames, &name);
...@@ -1969,7 +1973,6 @@ typedef struct { ...@@ -1969,7 +1973,6 @@ typedef struct {
int8_t withTbName; int8_t withTbName;
int8_t withSchema; int8_t withSchema;
int8_t withTag; int8_t withTag;
int8_t withTagSchema;
char* qmsg; char* qmsg;
} SMqRebVgReq; } SMqRebVgReq;
...@@ -1984,7 +1987,6 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR ...@@ -1984,7 +1987,6 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
tlen += taosEncodeFixedI8(buf, pReq->withTbName); tlen += taosEncodeFixedI8(buf, pReq->withTbName);
tlen += taosEncodeFixedI8(buf, pReq->withSchema); tlen += taosEncodeFixedI8(buf, pReq->withSchema);
tlen += taosEncodeFixedI8(buf, pReq->withTag); tlen += taosEncodeFixedI8(buf, pReq->withTag);
tlen += taosEncodeFixedI8(buf, pReq->withTagSchema);
if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
tlen += taosEncodeString(buf, pReq->qmsg); tlen += taosEncodeString(buf, pReq->qmsg);
} }
...@@ -2001,7 +2003,6 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) ...@@ -2001,7 +2003,6 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
buf = taosDecodeFixedI8(buf, &pReq->withTbName); buf = taosDecodeFixedI8(buf, &pReq->withTbName);
buf = taosDecodeFixedI8(buf, &pReq->withSchema); buf = taosDecodeFixedI8(buf, &pReq->withSchema);
buf = taosDecodeFixedI8(buf, &pReq->withTag); buf = taosDecodeFixedI8(buf, &pReq->withTag);
buf = taosDecodeFixedI8(buf, &pReq->withTagSchema);
if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
buf = taosDecodeString(buf, &pReq->qmsg); buf = taosDecodeString(buf, &pReq->qmsg);
} }
...@@ -2590,7 +2591,6 @@ typedef struct { ...@@ -2590,7 +2591,6 @@ typedef struct {
int8_t withTbName; int8_t withTbName;
int8_t withSchema; int8_t withSchema;
int8_t withTag; int8_t withTag;
int8_t withTagSchema;
SArray* blockDataLen; // SArray<int32_t> SArray* blockDataLen; // SArray<int32_t>
SArray* blockData; // SArray<SRetrieveTableRsp*> SArray* blockData; // SArray<SRetrieveTableRsp*>
SArray* blockTbName; // SArray<char*> SArray* blockTbName; // SArray<char*>
...@@ -2609,7 +2609,6 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp ...@@ -2609,7 +2609,6 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
tlen += taosEncodeFixedI8(buf, pRsp->withTbName); tlen += taosEncodeFixedI8(buf, pRsp->withTbName);
tlen += taosEncodeFixedI8(buf, pRsp->withSchema); tlen += taosEncodeFixedI8(buf, pRsp->withSchema);
tlen += taosEncodeFixedI8(buf, pRsp->withTag); tlen += taosEncodeFixedI8(buf, pRsp->withTag);
tlen += taosEncodeFixedI8(buf, pRsp->withTagSchema);
for (int32_t i = 0; i < pRsp->blockNum; i++) { for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i); int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i);
...@@ -2632,7 +2631,6 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p ...@@ -2632,7 +2631,6 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema); buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
buf = taosDecodeFixedI8(buf, &pRsp->withTag); buf = taosDecodeFixedI8(buf, &pRsp->withTag);
buf = taosDecodeFixedI8(buf, &pRsp->withTagSchema);
for (int32_t i = 0; i < pRsp->blockNum; i++) { for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = 0; int32_t bLen = 0;
......
...@@ -192,7 +192,13 @@ int32_t walEndSnapshot(SWal *); ...@@ -192,7 +192,13 @@ int32_t walEndSnapshot(SWal *);
SWalReadHandle *walOpenReadHandle(SWal *); SWalReadHandle *walOpenReadHandle(SWal *);
void walCloseReadHandle(SWalReadHandle *); void walCloseReadHandle(SWalReadHandle *);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead);
// only for tq usage
// int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead);
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity);
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead);
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead);
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead);
// deprecated // deprecated
#if 0 #if 0
......
...@@ -13,13 +13,13 @@ ...@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #include "catalog.h"
#include "tdef.h"
#include "tname.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "catalog.h" #include "os.h"
#include "query.h" #include "query.h"
#include "tdef.h"
#include "tname.h"
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
...@@ -50,7 +50,13 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -50,7 +50,13 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SConnectRsp connectRsp = {0}; SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp); tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp);
assert(connectRsp.epSet.numOfEps > 0); /*assert(connectRsp.epSet.numOfEps > 0);*/
if (connectRsp.epSet.numOfEps == 0) {
taosMemoryFree(pMsg->pData);
setErrno(pRequest, TSDB_CODE_MND_APP_ERROR);
tsem_post(&pRequest->body.rspSem);
return code;
}
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
...@@ -82,7 +88,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -82,7 +88,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return 0; return 0;
} }
SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->requestObjRefId = pRequest->self;
...@@ -93,7 +99,9 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { ...@@ -93,7 +99,9 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
assert(pRequest != NULL); assert(pRequest != NULL);
pMsgSendInfo->msgInfo = pRequest->body.requestMsg; pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)]; pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)
? genericRspCallback
: handleRequestRspFp[TMSG_INDEX(pRequest->type)];
return pMsgSendInfo; return pMsgSendInfo;
} }
...@@ -114,7 +122,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -114,7 +122,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if (TSDB_CODE_MND_DB_NOT_EXIST == code) { if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
SUseDbRsp usedbRsp = {0}; SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
struct SCatalog *pCatalog = NULL; struct SCatalog* pCatalog = NULL;
if (usedbRsp.vgVersion >= 0) { if (usedbRsp.vgVersion >= 0) {
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
......
...@@ -382,12 +382,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -382,12 +382,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ); pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) return NULL; if (pTmq->pTscObj == NULL) return NULL;
/*pTmq->inWaiting = 0;*/
pTmq->status = 0; pTmq->status = 0;
pTmq->pollCnt = 0; pTmq->pollCnt = 0;
pTmq->epoch = 0; pTmq->epoch = 0;
/*pTmq->waitingRequest = 0;*/
/*pTmq->readyRequest = 0;*/
pTmq->epStatus = 0; pTmq->epStatus = 0;
pTmq->epSkipCnt = 0; pTmq->epSkipCnt = 0;
// set conf // set conf
...@@ -509,7 +506,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -509,7 +506,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
SCMSubscribeReq req; SCMSubscribeReq req;
req.topicNum = sz;
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
strcpy(req.cgroup, tmq->groupId); strcpy(req.cgroup, tmq->groupId);
req.topicNames = taosArrayInit(sz, sizeof(void*)); req.topicNames = taosArrayInit(sz, sizeof(void*));
...@@ -519,12 +515,16 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -519,12 +515,16 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
char* topicName = taosArrayGetP(container, i); char* topicName = taosArrayGetP(container, i);
SName name = {0}; SName name = {0};
#if 0
char* dbName = getDbOfConnection(tmq->pTscObj); char* dbName = getDbOfConnection(tmq->pTscObj);
if (dbName == NULL) { if (dbName == NULL) {
return TMQ_RESP_ERR__FAIL; return TMQ_RESP_ERR__FAIL;
} }
tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName)); #endif
tNameSetDbName(&name, tmq->pTscObj->acctId, topicName, strlen(topicName));
#if 0
tNameFromString(&name, topicName, T_NAME_TABLE); tNameFromString(&name, topicName, T_NAME_TABLE);
#endif
char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
if (topicFname == NULL) { if (topicFname == NULL) {
...@@ -542,7 +542,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -542,7 +542,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg)); topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
taosArrayPush(tmq->clientTopics, &topic); taosArrayPush(tmq->clientTopics, &topic);
taosArrayPush(req.topicNames, &topicFname); taosArrayPush(req.topicNames, &topicFname);
#if 0
taosMemoryFree(dbName); taosMemoryFree(dbName);
#endif
} }
int tlen = tSerializeSCMSubscribeReq(NULL, &req); int tlen = tSerializeSCMSubscribeReq(NULL, &req);
......
...@@ -126,7 +126,6 @@ int32_t tDecodeSQueryNodeAddr(SCoder *pDecoder, SQueryNodeAddr *pAddr) { ...@@ -126,7 +126,6 @@ int32_t tDecodeSQueryNodeAddr(SCoder *pDecoder, SQueryNodeAddr *pAddr) {
return 0; return 0;
} }
int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) { int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse); tlen += taosEncodeFixedI8(buf, pEp->inUse);
...@@ -2745,11 +2744,11 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo ...@@ -2745,11 +2744,11 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1; if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1; if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1; if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subscribeDbName) < 0) return -1;
if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
if (0 == astLen && tEncodeCStr(&encoder, pReq->subscribeDbName) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -2771,6 +2770,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR ...@@ -2771,6 +2770,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1; if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1; if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1; if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->subscribeDbName) < 0) return -1;
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
if (tDecodeI32(&decoder, &astLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1;
...@@ -2785,7 +2785,6 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR ...@@ -2785,7 +2785,6 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (pReq->ast == NULL) return -1; if (pReq->ast == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
} else { } else {
if (tDecodeCStrTo(&decoder, pReq->subscribeDbName) < 0) return -1;
} }
tEndDecode(&decoder); tEndDecode(&decoder);
......
...@@ -450,7 +450,6 @@ typedef struct { ...@@ -450,7 +450,6 @@ typedef struct {
int8_t withTbName; int8_t withTbName;
int8_t withSchema; int8_t withSchema;
int8_t withTag; int8_t withTag;
int8_t withTagSchema;
SRWLatch lock; SRWLatch lock;
int32_t sqlLen; int32_t sqlLen;
int32_t astLen; int32_t astLen;
...@@ -517,7 +516,6 @@ typedef struct { ...@@ -517,7 +516,6 @@ typedef struct {
int8_t withTbName; int8_t withTbName;
int8_t withSchema; int8_t withSchema;
int8_t withTag; int8_t withTag;
int8_t withTagSchema;
SHashObj* consumerHash; // consumerId -> SMqConsumerEpInSub SHashObj* consumerHash; // consumerId -> SMqConsumerEpInSub
// TODO put -1 into unassignVgs // TODO put -1 into unassignVgs
// SArray* unassignedVgs; // SArray* unassignedVgs;
......
...@@ -237,7 +237,6 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { ...@@ -237,7 +237,6 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
pSubNew->withTbName = pSub->withTbName; pSubNew->withTbName = pSub->withTbName;
pSubNew->withSchema = pSub->withSchema; pSubNew->withSchema = pSub->withSchema;
pSubNew->withTag = pSub->withTag; pSubNew->withTag = pSub->withTag;
pSubNew->withTagSchema = pSub->withTagSchema;
pSubNew->vgNum = pSub->vgNum; pSubNew->vgNum = pSub->vgNum;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
...@@ -270,7 +269,6 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { ...@@ -270,7 +269,6 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += taosEncodeFixedI8(buf, pSub->withTbName); tlen += taosEncodeFixedI8(buf, pSub->withTbName);
tlen += taosEncodeFixedI8(buf, pSub->withSchema); tlen += taosEncodeFixedI8(buf, pSub->withSchema);
tlen += taosEncodeFixedI8(buf, pSub->withTag); tlen += taosEncodeFixedI8(buf, pSub->withTag);
tlen += taosEncodeFixedI8(buf, pSub->withTagSchema);
void *pIter = NULL; void *pIter = NULL;
int32_t sz = taosHashGetSize(pSub->consumerHash); int32_t sz = taosHashGetSize(pSub->consumerHash);
...@@ -297,7 +295,6 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { ...@@ -297,7 +295,6 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
buf = taosDecodeFixedI8(buf, &pSub->withTbName); buf = taosDecodeFixedI8(buf, &pSub->withTbName);
buf = taosDecodeFixedI8(buf, &pSub->withSchema); buf = taosDecodeFixedI8(buf, &pSub->withSchema);
buf = taosDecodeFixedI8(buf, &pSub->withTag); buf = taosDecodeFixedI8(buf, &pSub->withTag);
buf = taosDecodeFixedI8(buf, &pSub->withTagSchema);
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
......
...@@ -35,11 +35,6 @@ ...@@ -35,11 +35,6 @@
#define MND_SUBSCRIBE_REBALANCE_CNT 3 #define MND_SUBSCRIBE_REBALANCE_CNT 3
enum {
MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
MQ_SUBSCRIBE_STATUS__DELETED,
};
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
...@@ -89,7 +84,6 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, ...@@ -89,7 +84,6 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
pSub->withTbName = pTopic->withTbName; pSub->withTbName = pTopic->withTbName;
pSub->withSchema = pTopic->withSchema; pSub->withSchema = pTopic->withSchema;
pSub->withTag = pTopic->withTag; pSub->withTag = pTopic->withTag;
pSub->withTagSchema = pTopic->withTagSchema;
ASSERT(taosHashGetSize(pSub->consumerHash) == 1); ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
...@@ -115,7 +109,6 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri ...@@ -115,7 +109,6 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
req.withTbName = pSub->withTbName; req.withTbName = pSub->withTbName;
req.withSchema = pSub->withSchema; req.withSchema = pSub->withSchema;
req.withTag = pSub->withTag; req.withTag = pSub->withTag;
req.withTagSchema = pSub->withTagSchema;
strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
...@@ -514,9 +507,11 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { ...@@ -514,9 +507,11 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
// TODO replace assert with error check // TODO replace assert with error check
ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0); ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
// if add more consumer to balanced subscribe, // if add more consumer to balanced subscribe,
// possibly no vg is changed // possibly no vg is changed
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/ /*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0); ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0);
if (rebInput.pTopic) { if (rebInput.pTopic) {
...@@ -673,177 +668,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { ...@@ -673,177 +668,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
sdbRelease(pSdb, pSub); sdbRelease(pSdb, pSub);
} }
#if 0
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
char *msgStr = pMsg->rpcMsg.pCont;
SCMSubscribeReq subscribe;
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
int64_t consumerId = subscribe.consumerId;
char *cgroup = subscribe.consumerGroup;
SArray *newSub = subscribe.topicNames;
int32_t newTopicNum = subscribe.topicNum;
taosArraySortString(newSub, taosArrayCompareString);
SArray *oldSub = NULL;
int32_t oldTopicNum = 0;
bool createConsumer = false;
// create consumer if not exist
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
// create consumer
pConsumer = mndCreateConsumer(consumerId, cgroup);
createConsumer = true;
} else {
pConsumer->epoch++;
oldSub = pConsumer->currentTopics;
}
pConsumer->currentTopics = newSub;
if (oldSub != NULL) {
oldTopicNum = taosArrayGetSize(oldSub);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) {
// TODO: free memory
return -1;
}
int32_t i = 0, j = 0;
while (i < newTopicNum || j < oldTopicNum) {
char *newTopicName = NULL;
char *oldTopicName = NULL;
if (i >= newTopicNum) {
// encode unset topic msg to all vnodes related to that topic
oldTopicName = taosArrayGetP(oldSub, j);
j++;
} else if (j >= oldTopicNum) {
newTopicName = taosArrayGetP(newSub, i);
i++;
} else {
newTopicName = taosArrayGetP(newSub, i);
oldTopicName = taosArrayGetP(oldSub, j);
int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName);
if (comp == 0) {
// do nothing
oldTopicName = newTopicName = NULL;
i++;
j++;
continue;
} else if (comp < 0) {
oldTopicName = NULL;
i++;
} else {
newTopicName = NULL;
j++;
}
}
if (oldTopicName != NULL) {
ASSERT(newTopicName == NULL);
// cancel subscribe of old topic
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
ASSERT(pSub);
int32_t csz = taosArrayGetSize(pSub->consumers);
for (int32_t ci = 0; ci < csz; ci++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
if (pSubConsumer->consumerId == consumerId) {
int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
for (int32_t vgi = 0; vgi < vgsz; vgi++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp, oldTopicName);
taosArrayPush(pSub->unassignedVg, pConsumerEp);
}
taosArrayRemove(pSub->consumers, ci);
break;
}
}
char *oldTopicNameDup = strdup(oldTopicName);
taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
/*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
} else if (newTopicName != NULL) {
ASSERT(oldTopicName == NULL);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
if (pTopic == NULL) {
mError("topic being subscribed not exist: %s", newTopicName);
continue;
}
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
bool createSub = false;
if (pSub == NULL) {
mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup,
newTopicName);
pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
createSub = true;
mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg);
}
SMqSubConsumer mqSubConsumer;
mqSubConsumer.consumerId = consumerId;
mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp));
taosArrayPush(pSub->consumers, &mqSubConsumer);
// if have un assigned vg, assign one to the consumer
if (taosArrayGetSize(pSub->unassignedVg) > 0) {
SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = consumerId;
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
if (pConsumerEp->oldConsumerId == -1) {
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, newTopicName,
pConsumerEp->consumerId);
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
} else {
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, newTopicName);
}
// to trigger rebalance at once, do not set status active
/*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
}
SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pRaw);
if (!createSub) mndReleaseSubscribe(pMnode, pSub);
mndReleaseTopic(pMnode, pTopic);
}
}
/*if (oldSub) taosArrayDestroyEx(oldSub, (void (*)(void *))taosMemoryFree);*/
// persist consumerObj
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
mndTransDrop(pTrans);
if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
#endif
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) { static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) {
mndTransProcessRsp(pRsp); mndTransProcessRsp(pRsp);
return 0; return 0;
} }
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
...@@ -82,7 +82,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -82,7 +82,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withTagSchema, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
...@@ -146,7 +145,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -146,7 +145,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTagSchema, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char)); pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
...@@ -234,6 +232,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) { ...@@ -234,6 +232,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
} }
#if 0
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
SName name = {0}; SName name = {0};
tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
...@@ -243,6 +242,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { ...@@ -243,6 +242,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
return mndAcquireDb(pMnode, db); return mndAcquireDb(pMnode, db);
} }
#endif
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) { static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
int32_t contLen = sizeof(SDDropTopicReq); int32_t contLen = sizeof(SDDropTopicReq);
...@@ -262,15 +262,11 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq ...@@ -262,15 +262,11 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
} }
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) { if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subscribeDbName[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
return -1; return -1;
} }
if ((pCreate->ast == NULL || pCreate->ast[0] == 0) && pCreate->subscribeDbName[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
return -1;
}
return 0; return 0;
} }
...@@ -386,7 +382,7 @@ static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq) { ...@@ -386,7 +382,7 @@ static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq) {
goto CREATE_TOPIC_OVER; goto CREATE_TOPIC_OVER;
} }
pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name); pDb = mndAcquireDb(pMnode, createTopicReq.subscribeDbName);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
goto CREATE_TOPIC_OVER; goto CREATE_TOPIC_OVER;
......
...@@ -159,7 +159,6 @@ typedef struct { ...@@ -159,7 +159,6 @@ typedef struct {
int8_t withTbName; int8_t withTbName;
int8_t withSchema; int8_t withSchema;
int8_t withTag; int8_t withTag;
int8_t withTagSchema;
char* qmsg; char* qmsg;
STqPushHandle pushHandle; STqPushHandle pushHandle;
// SRWLatch lock; // SRWLatch lock;
......
...@@ -31,6 +31,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { ...@@ -31,6 +31,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->pVnode = pVnode; pTq->pVnode = pVnode;
pTq->pWal = pWal; pTq->pWal = pWal;
#if 0 #if 0
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
(FTqDelete)taosMemoryFree, 0); (FTqDelete)taosMemoryFree, 0);
...@@ -401,6 +402,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -401,6 +402,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch); consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch);
} }
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
if (pHeadWithCkSum == NULL) {
return -1;
}
walSetReaderCapacity(pExec->pWalReader, 2048);
SMqDataBlkRsp rsp = {0}; SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset; rsp.reqOffset = pReq->currentOffset;
rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockData = taosArrayInit(0, sizeof(void*));
...@@ -414,6 +422,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -414,6 +422,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
break; break;
} }
taosThreadMutexLock(&pExec->pWalReader->mutex);
if (walFetchHead(pExec->pWalReader, fetchOffset, pHeadWithCkSum) < 0) {
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset);
taosThreadMutexUnlock(&pExec->pWalReader->mutex);
break;
}
if (pHeadWithCkSum->head.msgType != TDMT_VND_SUBMIT) {
walSkipFetchBody(pExec->pWalReader, pHeadWithCkSum);
} else {
walFetchBody(pExec->pWalReader, &pHeadWithCkSum);
}
SWalReadHead* pHead = &pHeadWithCkSum->head;
taosThreadMutexUnlock(&pExec->pWalReader->mutex);
#if 0
SWalReadHead* pHead; SWalReadHead* pHead;
if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) { if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time // TODO: no more log, set timer to wait blocking time
...@@ -445,12 +473,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -445,12 +473,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
break; break;
} }
#endif
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
// table subscribe
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
qTaskInfo_t task = pExec->task[workerId]; qTaskInfo_t task = pExec->task[workerId];
ASSERT(task); ASSERT(task);
...@@ -484,6 +514,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -484,6 +514,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayPush(rsp.blockData, &buf); taosArrayPush(rsp.blockData, &buf);
rsp.blockNum++; rsp.blockNum++;
} }
// db subscribe
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReadHandle* pReader = pExec->pExecReader[workerId]; STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pCont, 0); tqReadHandleSetMsg(pReader, pCont, 0);
...@@ -789,7 +820,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -789,7 +820,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pExec->withTbName = req.withTbName; pExec->withTbName = req.withTbName;
pExec->withSchema = req.withSchema; pExec->withSchema = req.withSchema;
pExec->withTag = req.withTag; pExec->withTag = req.withTag;
pExec->withTagSchema = req.withTagSchema;
pExec->qmsg = req.qmsg; pExec->qmsg = req.qmsg;
req.qmsg = NULL; req.qmsg = NULL;
......
...@@ -560,8 +560,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { ...@@ -560,8 +560,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes; pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
} }
} else if (nodesIsComparisonOp(pOp)) { } else if (nodesIsComparisonOp(pOp)) {
if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type || if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type || TSDB_DATA_TYPE_BLOB == rdt.type) {
TSDB_DATA_TYPE_BLOB == rdt.type) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
} }
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) { if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
...@@ -569,7 +568,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { ...@@ -569,7 +568,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
} }
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
} else if (nodesIsJsonOp(pOp)){ } else if (nodesIsJsonOp(pOp)) {
if (TSDB_DATA_TYPE_JSON != ldt.type || TSDB_DATA_TYPE_BINARY != rdt.type) { if (TSDB_DATA_TYPE_JSON != ldt.type || TSDB_DATA_TYPE_BINARY != rdt.type) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
} }
...@@ -588,7 +587,9 @@ static EDealRes haveAggFunction(SNode* pNode, void* pContext) { ...@@ -588,7 +587,9 @@ static EDealRes haveAggFunction(SNode* pNode, void* pContext) {
} }
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) { static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet}; SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
.pRpc = pCxt->pParseCxt->pTransporter,
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(&param, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) { if (TSDB_CODE_SUCCESS != fmGetFuncInfo(&param, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
} }
...@@ -1345,7 +1346,8 @@ static int32_t translateFrom(STranslateContext* pCxt, SSelectStmt* pSelect) { ...@@ -1345,7 +1346,8 @@ static int32_t translateFrom(STranslateContext* pCxt, SSelectStmt* pSelect) {
} }
static int32_t checkLimit(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t checkLimit(STranslateContext* pCxt, SSelectStmt* pSelect) {
if ((NULL != pSelect->pLimit && pSelect->pLimit->offset < 0) || (NULL != pSelect->pSlimit && pSelect->pSlimit->offset < 0)) { if ((NULL != pSelect->pLimit && pSelect->pLimit->offset < 0) ||
(NULL != pSelect->pSlimit && pSelect->pSlimit->offset < 0)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_OFFSET_LESS_ZERO); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_OFFSET_LESS_ZERO);
} }
...@@ -1650,7 +1652,8 @@ static int32_t checkKeepOption(STranslateContext* pCxt, SNodeList* pKeep) { ...@@ -1650,7 +1652,8 @@ static int32_t checkKeepOption(STranslateContext* pCxt, SNodeList* pKeep) {
(TIME_UNIT_MINUTE != pKeep1->unit && TIME_UNIT_HOUR != pKeep1->unit && TIME_UNIT_DAY != pKeep1->unit)) || (TIME_UNIT_MINUTE != pKeep1->unit && TIME_UNIT_HOUR != pKeep1->unit && TIME_UNIT_DAY != pKeep1->unit)) ||
(pKeep2->isDuration && (pKeep2->isDuration &&
(TIME_UNIT_MINUTE != pKeep2->unit && TIME_UNIT_HOUR != pKeep2->unit && TIME_UNIT_DAY != pKeep2->unit))) { (TIME_UNIT_MINUTE != pKeep2->unit && TIME_UNIT_HOUR != pKeep2->unit && TIME_UNIT_DAY != pKeep2->unit))) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_KEEP_UNIT, pKeep0->unit, pKeep1->unit, pKeep2->unit); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_KEEP_UNIT, pKeep0->unit, pKeep1->unit,
pKeep2->unit);
} }
int32_t daysToKeep0 = getBigintFromValueNode(pKeep0); int32_t daysToKeep0 = getBigintFromValueNode(pKeep0);
...@@ -1691,7 +1694,8 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete ...@@ -1691,7 +1694,8 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions, bool alter) { static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions,
bool alter) {
if (NULL == pOptions->pDaysPerFile && NULL == pOptions->pKeep) { if (NULL == pOptions->pDaysPerFile && NULL == pOptions->pKeep) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1712,7 +1716,8 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa ...@@ -1712,7 +1716,8 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions, bool alter) { static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions,
bool alter) {
int32_t code = int32_t code =
checkRangeOption(pCxt, "totalBlocks", pOptions->pNumOfBlocks, TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS); checkRangeOption(pCxt, "totalBlocks", pOptions->pNumOfBlocks, TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -1945,7 +1950,7 @@ static int32_t checkTableTags(STranslateContext* pCxt, SCreateTableStmt* pStmt) ...@@ -1945,7 +1950,7 @@ static int32_t checkTableTags(STranslateContext* pCxt, SCreateTableStmt* pStmt)
SNode* pNode; SNode* pNode;
FOREACH(pNode, pStmt->pTags) { FOREACH(pNode, pStmt->pTags) {
SColumnDefNode* pCol = (SColumnDefNode*)pNode; SColumnDefNode* pCol = (SColumnDefNode*)pNode;
if(pCol->dataType.type == TSDB_DATA_TYPE_JSON && LIST_LENGTH(pStmt->pTags) > 1){ if (pCol->dataType.type == TSDB_DATA_TYPE_JSON && LIST_LENGTH(pStmt->pTags) > 1) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG);
} }
} }
...@@ -1961,7 +1966,9 @@ static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs ...@@ -1961,7 +1966,9 @@ static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION);
} }
SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0); SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0);
SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet}; SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
.pRpc = pCxt->pParseCxt->pTransporter,
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(&param, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) { if (TSDB_CODE_SUCCESS != fmGetFuncInfo(&param, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
} }
...@@ -2090,7 +2097,7 @@ static SNode* makeIntervalVal(SRetention* pRetension, int8_t precision) { ...@@ -2090,7 +2097,7 @@ static SNode* makeIntervalVal(SRetention* pRetension, int8_t precision) {
} }
int64_t timeVal = convertTimeFromPrecisionToUnit(pRetension->freq, precision, pRetension->freqUnit); int64_t timeVal = convertTimeFromPrecisionToUnit(pRetension->freq, precision, pRetension->freqUnit);
char buf[20] = {0}; char buf[20] = {0};
int32_t len = snprintf(buf, sizeof(buf), "%"PRId64"%c", timeVal, pRetension->freqUnit); int32_t len = snprintf(buf, sizeof(buf), "%" PRId64 "%c", timeVal, pRetension->freqUnit);
pVal->literal = strndup(buf, len); pVal->literal = strndup(buf, len);
if (NULL == pVal->literal) { if (NULL == pVal->literal) {
nodesDestroyNode(pVal); nodesDestroyNode(pVal);
...@@ -2175,8 +2182,8 @@ static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precisi ...@@ -2175,8 +2182,8 @@ static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precisi
return pMeta; return pMeta;
} }
static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt, static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, SRetention* pRetension,
SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, SSampleAstInfo* pInfo) { int8_t precision, SSampleAstInfo* pInfo) {
pInfo->pDbName = pStmt->dbName; pInfo->pDbName = pStmt->dbName;
pInfo->pTableName = pStmt->tableName; pInfo->pTableName = pStmt->tableName;
pInfo->pFuncs = createRollupFuncs(pStmt); pInfo->pFuncs = createRollupFuncs(pStmt);
...@@ -2188,8 +2195,8 @@ static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt, ...@@ -2188,8 +2195,8 @@ static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t getRollupAst(STranslateContext* pCxt, static int32_t getRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision,
SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, char** pAst, int32_t* pLen) { char** pAst, int32_t* pLen) {
SSampleAstInfo info = {0}; SSampleAstInfo info = {0};
int32_t code = buildSampleAstInfoByTable(pCxt, pStmt, pRetension, precision, &info); int32_t code = buildSampleAstInfoByTable(pCxt, pStmt, pRetension, precision, &info);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -2207,11 +2214,11 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, ...@@ -2207,11 +2214,11 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
return code; return code;
} }
for (int32_t i = 1; i < num; ++i) { for (int32_t i = 1; i < num; ++i) {
SRetention *pRetension = taosArrayGet(dbCfg.pRetensions, i); SRetention* pRetension = taosArrayGet(dbCfg.pRetensions, i);
STranslateContext cxt = {0}; STranslateContext cxt = {0};
initTranslateContext(pCxt->pParseCxt, &cxt); initTranslateContext(pCxt->pParseCxt, &cxt);
code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision, code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision, 1 == i ? &pReq->pAst1 : &pReq->pAst2,
1 == i ? &pReq->pAst1 : &pReq->pAst2, 1 == i ? &pReq->ast1Len : &pReq->ast2Len); 1 == i ? &pReq->ast1Len : &pReq->ast2Len);
destroyTranslateContext(&cxt); destroyTranslateContext(&cxt);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
break; break;
...@@ -2289,7 +2296,8 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt ...@@ -2289,7 +2296,8 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt
static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableStmt* pStmt) { static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableStmt* pStmt) {
SName tableName; SName tableName;
return doTranslateDropSuperTable(pCxt, toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pStmt->ignoreNotExists); return doTranslateDropSuperTable(pCxt, toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName),
pStmt->ignoreNotExists);
} }
static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAltertbReq* pAlterReq) { static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAltertbReq* pAlterReq) {
...@@ -2618,9 +2626,9 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen ...@@ -2618,9 +2626,9 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen
static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) { static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) {
SName name; SName name;
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName)); tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
// tNameGetFullDbName(&name, pReq->name); tNameGetFullDbName(&name, pReq->name);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name); /*tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name);*/
pReq->igExists = pStmt->ignoreExists; pReq->igExists = pStmt->ignoreExists;
pReq->withTbName = pStmt->pOptions->withTable; pReq->withTbName = pStmt->pOptions->withTable;
pReq->withSchema = pStmt->pOptions->withSchema; pReq->withSchema = pStmt->pOptions->withSchema;
...@@ -2633,16 +2641,19 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS ...@@ -2633,16 +2641,19 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
const char* dbName;
if (NULL != pStmt->pQuery) { if (NULL != pStmt->pQuery) {
strcpy(pReq->subscribeDbName, ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName); dbName = ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName;
pCxt->pParseCxt->topicQuery = true; pCxt->pParseCxt->topicQuery = true;
code = translateQuery(pCxt, pStmt->pQuery); code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL); code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
} }
} else { } else {
strcpy(pReq->subscribeDbName, pStmt->subscribeDbName); dbName = pStmt->subscribeDbName;
} }
tNameSetDbName(&name, pCxt->pParseCxt->acctId, dbName, strlen(dbName));
tNameGetFullDbName(&name, pReq->subscribeDbName);
return code; return code;
} }
...@@ -2654,8 +2665,9 @@ static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt ...@@ -2654,8 +2665,9 @@ static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) { if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) && NULL == pSelect->pGroupByList && if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) &&
NULL == pSelect->pLimit && NULL == pSelect->pSlimit && NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) { NULL == pSelect->pGroupByList && NULL == pSelect->pLimit && NULL == pSelect->pSlimit &&
NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -2763,7 +2775,7 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt ...@@ -2763,7 +2775,7 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t readFromFile(char* pName, int32_t *len, char **buf) { static int32_t readFromFile(char* pName, int32_t* len, char** buf) {
int64_t filesize = 0; int64_t filesize = 0;
if (taosStatFile(pName, &filesize, NULL) < 0) { if (taosStatFile(pName, &filesize, NULL) < 0) {
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
...@@ -3408,8 +3420,8 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c ...@@ -3408,8 +3420,8 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c
static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema, static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema,
SKVRowBuilder* pBuilder) { SKVRowBuilder* pBuilder) {
if(pSchema->type == TSDB_DATA_TYPE_JSON){ if (pSchema->type == TSDB_DATA_TYPE_JSON) {
if(pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){ if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal); return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
} }
...@@ -3420,10 +3432,11 @@ static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SS ...@@ -3420,10 +3432,11 @@ static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SS
return pCxt->errCode; return pCxt->errCode;
} }
if(pVal->node.resType.type == TSDB_DATA_TYPE_NULL){ if (pVal->node.resType.type == TSDB_DATA_TYPE_NULL) {
// todo // todo
}else{ } else {
tdAddColToKVRow(pBuilder, pSchema->colId, &(pVal->datum.p), IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]); tdAddColToKVRow(pBuilder, pSchema->colId, &(pVal->datum.p),
IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -46,7 +46,7 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -46,7 +46,7 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
SQueryTableRsp rsp = {.code = code}; SQueryTableRsp rsp = {.code = code};
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
void * msg = rpcMallocCont(contLen); void *msg = rpcMallocCont(contLen);
tSerializeSQueryTableRsp(msg, contLen, &rsp); tSerializeSQueryTableRsp(msg, contLen, &rsp);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
...@@ -87,7 +87,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, ...@@ -87,7 +87,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo}; SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
void * pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
tSerializeSExplainRsp(pRsp, contLen, &rsp); tSerializeSExplainRsp(pRsp, contLen, &rsp);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
...@@ -107,7 +107,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, ...@@ -107,7 +107,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
void * pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
...@@ -223,7 +223,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { ...@@ -223,7 +223,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
showRsp.tableMeta.numOfColumns = cols; showRsp.tableMeta.numOfColumns = cols;
int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp); int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
void * pBuf = rpcMallocCont(bufLen); void *pBuf = rpcMallocCont(bufLen);
tSerializeSShowRsp(pBuf, bufLen, &showRsp); tSerializeSShowRsp(pBuf, bufLen, &showRsp);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -403,8 +403,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -403,8 +403,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
bool queryDone = false; bool queryDone = false;
SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont; SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
bool needStop = false; bool needStop = false;
SQWTaskCtx * handles = NULL; SQWTaskCtx *handles = NULL;
SQWorkerMgmt * mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
...@@ -538,7 +538,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -538,7 +538,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
SQWorkerMgmt * mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
int32_t code = 0; int32_t code = 0;
STaskCancelReq *msg = pMsg->pCont; STaskCancelReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
...@@ -620,7 +620,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -620,7 +620,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
SSchedulerHbReq req = {0}; SSchedulerHbReq req = {0};
SQWorkerMgmt * mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == pMsg->pCont) { if (NULL == pMsg->pCont) {
QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
......
...@@ -138,6 +138,91 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { ...@@ -138,6 +138,91 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return 0; return 0;
} }
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; }
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
int32_t code;
// TODO: valid ver
if (pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver);
if (code < 0) return -1;
}
if (!taosValidFile(pRead->pReadLogTFile)) {
return -1;
}
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) {
return -1;
}
code = walValidHeadCksum(pHead);
if (code != 0) {
wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
return 0;
}
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) {
int32_t code;
ASSERT(pRead->curVersion == pHead->head.version);
code = taosLSeekFile(pRead->pReadLogTFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1;
return -1;
}
pRead->curVersion++;
return 0;
}
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) {
SWalReadHead *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version;
if (pRead->capacity < pReadHead->bodyLen) {
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
*ppHead = ptr;
pRead->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pRead->pReadLogTFile, pReadHead->body, pReadHead->bodyLen)) {
return -1;
}
if (pReadHead->version != ver) {
wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version,
ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (walValidBodyCksum(*ppHead) != 0) {
wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
pRead->curVersion = ver + 1;
return 0;
}
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) { int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) {
taosThreadMutexLock(&pRead->mutex); taosThreadMutexLock(&pRead->mutex);
if (walReadWithHandle(pRead, ver) < 0) { if (walReadWithHandle(pRead, ver) < 0) {
...@@ -172,12 +257,14 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -172,12 +257,14 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
if (code != sizeof(SWalHead)) { if (code != sizeof(SWalHead)) {
return -1; return -1;
} }
code = walValidHeadCksum(pRead->pHead); code = walValidHeadCksum(pRead->pHead);
if (code != 0) { if (code != 0) {
wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver); wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
if (pRead->capacity < pRead->pHead->head.bodyLen) { if (pRead->capacity < pRead->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen); void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen);
if (ptr == NULL) { if (ptr == NULL) {
......
...@@ -13,8 +13,6 @@ ...@@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
...@@ -298,14 +296,14 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog ...@@ -298,14 +296,14 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.bodyLen = bodyLen;
pWal->writeHead.head.msgType = msgType; pWal->writeHead.head.msgType = msgType;
// sync info // sync info for sync module
pWal->writeHead.head.syncMeta = syncMeta; pWal->writeHead.head.syncMeta = syncMeta;
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) { if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
// ftruncate // TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno)); strerror(errno));
...@@ -313,7 +311,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog ...@@ -313,7 +311,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
} }
if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) { if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) {
// ftruncate // TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno)); strerror(errno));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册