提交 d410ef1c 编写于 作者: L Liu Jicong

fix plan convert error

上级 02e454a0
...@@ -1535,7 +1535,7 @@ static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* ...@@ -1535,7 +1535,7 @@ static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg*
tlen += taosEncodeFixedU64(buf, pMsg->queryId); tlen += taosEncodeFixedU64(buf, pMsg->queryId);
tlen += taosEncodeFixedU64(buf, pMsg->taskId); tlen += taosEncodeFixedU64(buf, pMsg->taskId);
tlen += taosEncodeFixedU32(buf, pMsg->contentLen); tlen += taosEncodeFixedU32(buf, pMsg->contentLen);
tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen); //tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen);
return tlen; return tlen;
} }
...@@ -1544,7 +1544,7 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { ...@@ -1544,7 +1544,7 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
buf = taosDecodeFixedU64(buf, &pMsg->queryId); buf = taosDecodeFixedU64(buf, &pMsg->queryId);
buf = taosDecodeFixedU64(buf, &pMsg->taskId); buf = taosDecodeFixedU64(buf, &pMsg->taskId);
buf = taosDecodeFixedU32(buf, &pMsg->contentLen); buf = taosDecodeFixedU32(buf, &pMsg->contentLen);
buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen); //buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen);
return buf; return buf;
} }
......
...@@ -308,10 +308,10 @@ tmq_conf_t* tmq_conf_new() { ...@@ -308,10 +308,10 @@ tmq_conf_t* tmq_conf_new() {
} }
int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (strcmp(key, "group.id")) { if (strcmp(key, "group.id") == 0) {
strcpy(conf->groupId, value); strcpy(conf->groupId, value);
} }
if (strcmp(key, "client.id")) { if (strcmp(key, "client.id") == 0) {
strcpy(conf->clientId, value); strcpy(conf->clientId, value);
} }
return 0; return 0;
...@@ -359,7 +359,7 @@ tmq_list_t* tmq_list_new() { ...@@ -359,7 +359,7 @@ tmq_list_t* tmq_list_new() {
int32_t tmq_list_append(tmq_list_t* ptr, char* src) { int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
if (ptr->cnt >= ptr->tot-1) return -1; if (ptr->cnt >= ptr->tot-1) return -1;
ptr->elems[ptr->cnt] = src; ptr->elems[ptr->cnt] = strdup(src);
ptr->cnt++; ptr->cnt++;
return 0; return 0;
} }
...@@ -371,8 +371,23 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -371,8 +371,23 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
int32_t sz = topic_list->cnt; int32_t sz = topic_list->cnt;
tmq->clientTopics = taosArrayInit(sz, sizeof(void*)); tmq->clientTopics = taosArrayInit(sz, sizeof(void*));
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
char* topicName = strdup(topic_list->elems[i]); char* topicName = topic_list->elems[i];
taosArrayPush(tmq->clientTopics, &topicName);
SName name = {0};
char* dbName = getConnectionDB(tmq->pTscObj);
tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName));
tNameFromString(&name, topicName, T_NAME_TABLE);
char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN);
if (topicFname == NULL) {
}
tNameExtractFullName(&name, topicFname);
tscDebug("subscribe topic: %s", topicFname);
taosArrayPush(tmq->clientTopics, &topicFname);
/*SMqClientTopic topic = {*/
/*.*/
/*};*/
} }
SCMSubscribeReq req; SCMSubscribeReq req;
req.topicNum = taosArrayGetSize(tmq->clientTopics); req.topicNum = taosArrayGetSize(tmq->clientTopics);
...@@ -396,7 +411,7 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -396,7 +411,7 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
} }
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
pRequest->type = TDMT_MND_CREATE_TOPIC; pRequest->type = TDMT_MND_SUBSCRIBE;
SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
...@@ -529,7 +544,7 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, ...@@ -529,7 +544,7 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
SCMCreateTopicReq req = { SCMCreateTopicReq req = {
.name = (char*) topicFname, .name = (char*) topicFname,
.igExists = 0, .igExists = 1,
.physicalPlan = (char*) pStr, .physicalPlan = (char*) pStr,
.sql = (char*) sql, .sql = (char*) sql,
.logicalPlan = "no logic plan", .logicalPlan = "no logic plan",
......
...@@ -297,22 +297,22 @@ TEST(testCase, driverInit_Test) { ...@@ -297,22 +297,22 @@ TEST(testCase, driverInit_Test) {
//} //}
// //
//TEST(testCase, create_table_Test) { //TEST(testCase, create_table_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); //assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); //TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes); //taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); //pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)");
// ASSERT_EQ(taos_errno(pRes), 0); //ASSERT_EQ(taos_errno(pRes), 0);
//
// taos_free_result(pRes); //taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); //pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)");
// ASSERT_NE(taos_errno(pRes), 0); //ASSERT_NE(taos_errno(pRes), 0);
//
// taos_free_result(pRes); //taos_free_result(pRes);
// taos_close(pConn); //taos_close(pConn);
//} //}
// //
//TEST(testCase, create_ctable_Test) { //TEST(testCase, create_ctable_Test) {
...@@ -453,37 +453,37 @@ TEST(testCase, driverInit_Test) { ...@@ -453,37 +453,37 @@ TEST(testCase, driverInit_Test) {
// taos_close(pConn); // taos_close(pConn);
//} //}
TEST(testCase, show_table_Test) { //TEST(testCase, show_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); //assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show tables"); //TAOS_RES* pRes = taos_query(pConn, "show tables");
if (taos_errno(pRes) != 0) { //if (taos_errno(pRes) != 0) {
printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); //printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); //taos_free_result(pRes);
} //}
pRes = taos_query(pConn, "show abc1.tables"); //pRes = taos_query(pConn, "show abc1.tables");
if (taos_errno(pRes) != 0) { //if (taos_errno(pRes) != 0) {
printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); //printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); //taos_free_result(pRes);
} //}
TAOS_ROW pRow = NULL; //TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes); //TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes); //int32_t numOfFields = taos_num_fields(pRes);
int32_t count = 0; //int32_t count = 0;
char str[512] = {0}; //char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) { //while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields); //int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%d: %s\n", ++count, str); //printf("%d: %s\n", ++count, str);
} //}
taos_free_result(pRes); //taos_free_result(pRes);
taos_close(pConn); //taos_close(pConn);
} //}
//TEST(testCase, drop_stable_Test) { //TEST(testCase, drop_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...@@ -546,13 +546,23 @@ TEST(testCase, create_topic_Test) { ...@@ -546,13 +546,23 @@ TEST(testCase, create_topic_Test) {
char* sql = "select * from tu"; char* sql = "select * from tu";
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
if (taos_errno(pRes) != 0) {
printf("failed to create topic, reason:%s\n", taos_errstr(pRes));
ASSERT_TRUE(0);
}
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, tmq_subscribe_Test) { TEST(testCase, tmq_subscribe_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", "abc1", 0);
assert(pConn != NULL); assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1"); tmq_conf_set(conf, "group.id", "tg1");
...@@ -593,56 +603,56 @@ TEST(testCase, tmq_commit_TEST) { ...@@ -593,56 +603,56 @@ TEST(testCase, tmq_commit_TEST) {
// taos_close(pConn); // taos_close(pConn);
//} //}
TEST(testCase, projection_query_tables) { //TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); //ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); //TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); //taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); //pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) { //if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); //printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
} //}
taos_free_result(pRes); //taos_free_result(pRes);
pRes = taos_query(pConn, "create table tu using st1 tags(1)"); //pRes = taos_query(pConn, "create table tu using st1 tags(1)");
if (taos_errno(pRes) != 0) { //if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); //printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
} //}
taos_free_result(pRes); //taos_free_result(pRes);
for(int32_t i = 0; i < 100000; ++i) { //for(int32_t i = 0; i < 100000; ++i) {
char sql[512] = {0}; //char sql[512] = {0};
sprintf(sql, "insert into tu values(now+%da, %d)", i, i); //sprintf(sql, "insert into tu values(now+%da, %d)", i, i);
TAOS_RES* p = taos_query(pConn, sql); //TAOS_RES* p = taos_query(pConn, sql);
if (taos_errno(p) != 0) { //if (taos_errno(p) != 0) {
printf("failed to insert data, reason:%s\n", taos_errstr(p)); //printf("failed to insert data, reason:%s\n", taos_errstr(p));
} //}
taos_free_result(p); //taos_free_result(p);
} //}
pRes = taos_query(pConn, "select * from tu"); //pRes = taos_query(pConn, "select * from tu");
if (taos_errno(pRes) != 0) { //if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); //printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); //taos_free_result(pRes);
ASSERT_TRUE(false); //ASSERT_TRUE(false);
} //}
TAOS_ROW pRow = NULL; //TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes); //TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes); //int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0}; //char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) { //while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields); //int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str); //printf("%s\n", str);
} //}
taos_free_result(pRes); //taos_free_result(pRes);
taos_close(pConn); //taos_close(pConn);
} //}
//TEST(testCase, projection_query_stables) { //TEST(testCase, projection_query_stables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......
...@@ -112,6 +112,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -112,6 +112,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;
// Requests handled by VNODE // Requests handled by VNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
......
...@@ -363,8 +363,9 @@ typedef struct SMqConsumerEp { ...@@ -363,8 +363,9 @@ typedef struct SMqConsumerEp {
int64_t consumerId; // -1 for unassigned int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs; int64_t lastConsumerHbTs;
int64_t lastVgHbTs; int64_t lastVgHbTs;
int32_t execLen; uint32_t qmsgLen;
SSubQueryMsg qExec; char* qmsg;
//SSubQueryMsg qExec;
} SMqConsumerEp; } SMqConsumerEp;
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
...@@ -373,7 +374,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon ...@@ -373,7 +374,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
tlen += taosEncodeFixedI32(buf, pConsumerEp->status); tlen += taosEncodeFixedI32(buf, pConsumerEp->status);
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); //tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
tlen += taosEncodeFixedU32(buf, pConsumerEp->qmsgLen);
tlen += taosEncodeBinary(buf, pConsumerEp->qmsg, pConsumerEp->qmsgLen);
return tlen; return tlen;
} }
...@@ -382,8 +385,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu ...@@ -382,8 +385,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
buf = taosDecodeFixedI32(buf, &pConsumerEp->status); buf = taosDecodeFixedI32(buf, &pConsumerEp->status);
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); //buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen; buf = taosDecodeFixedU32(buf, &pConsumerEp->qmsgLen);
buf = taosDecodeBinary(buf, (void**)&pConsumerEp->qmsg, pConsumerEp->qmsgLen);
return buf; return buf;
} }
...@@ -402,11 +406,12 @@ typedef struct SMqSubscribeObj { ...@@ -402,11 +406,12 @@ typedef struct SMqSubscribeObj {
static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj)); SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj));
pSub->key[0] = 0;
pSub->epoch = 0;
if (pSub == NULL) { if (pSub == NULL) {
return NULL; return NULL;
} }
pSub->key[0] = 0;
pSub->epoch = 0;
pSub->availConsumer = taosArrayInit(0, sizeof(int64_t)); pSub->availConsumer = taosArrayInit(0, sizeof(int64_t));
if (pSub->availConsumer == NULL) { if (pSub->availConsumer == NULL) {
free(pSub); free(pSub);
...@@ -433,7 +438,7 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { ...@@ -433,7 +438,7 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
free(pSub); free(pSub);
return NULL; return NULL;
} }
return NULL; return pSub;
} }
static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) { static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) {
......
...@@ -56,7 +56,9 @@ void mndCleanupConsumer(SMnode *pMnode) {} ...@@ -56,7 +56,9 @@ void mndCleanupConsumer(SMnode *pMnode) {}
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer); int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, tlen); int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
if (pRaw == NULL) goto CM_ENCODE_OVER; if (pRaw == NULL) goto CM_ENCODE_OVER;
void* buf = malloc(tlen); void* buf = malloc(tlen);
...@@ -68,34 +70,6 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { ...@@ -68,34 +70,6 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
#if 0
int32_t topicNum = taosArrayGetSize(pConsumer->topics);
SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER);
int32_t len = strlen(pConsumer->cgroup);
SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CM_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, topicNum, CM_ENCODE_OVER);
for (int i = 0; i < topicNum; i++) {
int32_t len;
SMqConsumerTopic *pConsumerTopic = taosArrayGet(pConsumer->topics, i);
len = strlen(pConsumerTopic->name);
SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pConsumerTopic->name, len, CM_ENCODE_OVER);
int vgSize;
if (pConsumerTopic->vgroups == NULL) {
vgSize = 0;
} else {
vgSize = listNEles(pConsumerTopic->vgroups);
}
SDB_SET_INT32(pRaw, dataPos, vgSize, CM_ENCODE_OVER);
for (int j = 0; j < vgSize; j++) {
// SList* head;
/*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/
}
}
#endif
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
...@@ -116,53 +90,35 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { ...@@ -116,53 +90,35 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CONSUME_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
if (sver != MND_CONSUMER_VER_NUMBER) { if (sver != MND_CONSUMER_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto CONSUME_DECODE_OVER; goto CM_DECODE_OVER;
} }
SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj)); SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj));
if (pRow == NULL) goto CONSUME_DECODE_OVER; if (pRow == NULL) goto CM_DECODE_OVER;
SMqConsumerObj *pConsumer = sdbGetRowObj(pRow); SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
if (pConsumer == NULL) goto CONSUME_DECODE_OVER; if (pConsumer == NULL) goto CM_DECODE_OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
int32_t len; int32_t len;
SDB_GET_INT32(pRaw, dataPos, &len, CONSUME_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
void* buf = malloc(len); void* buf = malloc(len);
if (buf == NULL) goto CONSUME_DECODE_OVER; if (buf == NULL) goto CM_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, buf, len, CONSUME_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
tDecodeSMqConsumerObj(buf, pConsumer);
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CONSUME_DECODE_OVER); if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
goto CM_DECODE_OVER;
}
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
#if 0 CM_DECODE_OVER:
SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER); if (terrno != TSDB_CODE_SUCCESS) {
for (int i = 0; i < topicNum; i++) {
int32_t topicLen;
SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
if (pConsumerTopic == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
// TODO
return NULL;
}
/*pConsumerTopic->vgroups = taosArrayInit(topicNum, sizeof(SMqConsumerTopic));*/
SDB_GET_INT32(pRaw, dataPos, &topicLen, CONSUME_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pConsumerTopic->name, topicLen, CONSUME_DECODE_OVER);
int32_t vgSize;
SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER);
}
#endif
CONSUME_DECODE_OVER:
if (terrno != 0) {
mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
tfree(pRow); tfree(pRow);
return NULL; return NULL;
......
...@@ -96,25 +96,27 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -96,25 +96,27 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer); pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
// build msg // build msg
SMqSetCVgReq req = {
.vgId = pCEp->vgId, SMqSetCVgReq* pReq = malloc(sizeof(SMqSetCVgReq) + pCEp->qmsgLen);
.oldConsumerId = -1, if (pReq == NULL) {
.newConsumerId = consumerId, terrno = TSDB_CODE_OUT_OF_MEMORY;
}; return -1;
strcpy(req.cgroup, cgroup); }
strcpy(req.topicName, topic); strcpy(pReq->cgroup, cgroup);
strcpy(req.sql, pTopic->sql); strcpy(pReq->topicName, topic);
strcpy(req.logicalPlan, pTopic->logicalPlan); pReq->sql = strdup(pTopic->sql);
strcpy(req.physicalPlan, pTopic->physicalPlan); pReq->logicalPlan = strdup(pTopic->logicalPlan);
memcpy(&req.msg, &pCEp->qExec, pCEp->execLen); pReq->physicalPlan = strdup(pTopic->physicalPlan);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); pReq->msg.contentLen = pCEp->qmsgLen;
memcpy(pReq->msg.msg, pCEp->qmsg, pCEp->qmsgLen);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq);
void *reqStr = malloc(tlen); void *reqStr = malloc(tlen);
if (reqStr == NULL) { if (reqStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
void *abuf = reqStr; void *abuf = reqStr;
tEncodeSMqSetCVgReq(abuf, &req); tEncodeSMqSetCVgReq(&abuf, pReq);
// persist msg // persist msg
STransAction action = {0}; STransAction action = {0};
...@@ -128,6 +130,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -128,6 +130,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
SSdbRaw *pRaw = mndSubActionEncode(pSub); SSdbRaw *pRaw = mndSubActionEncode(pSub);
mndTransAppendRedolog(pTrans, pRaw); mndTransAppendRedolog(pTrans, pRaw);
free(pReq);
tfree(topic); tfree(topic);
tfree(cgroup); tfree(cgroup);
} }
...@@ -146,6 +149,14 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas ...@@ -146,6 +149,14 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
//convert phyplan to dag //convert phyplan to dag
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SArray *pArray; SArray *pArray;
SArray* inner = taosArrayGet(pDag->pSubplans, 0);
SSubplan *plan = taosArrayGetP(inner, 0);
plan->execNode.inUse = 0;
strcpy(plan->execNode.epAddr[0].fqdn, "localhost");
plan->execNode.epAddr[0].port = 6030;
plan->execNode.nodeId = 2;
plan->execNode.numOfEps = 1;
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
return -1; return -1;
} }
...@@ -157,11 +168,17 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas ...@@ -157,11 +168,17 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
STaskInfo* pTaskInfo = taosArrayGet(pArray, i); STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);
CEp.vgId = pTaskInfo->addr.nodeId; CEp.vgId = pTaskInfo->addr.nodeId;
CEp.qmsg = malloc(sizeof(pTaskInfo->msg->contentLen));
if (CEp.qmsg == NULL) {
return -1;
}
memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);
taosArrayPush(unassignedVg, &CEp); taosArrayPush(unassignedVg, &CEp);
} }
qDestroyQueryDag(pDag); /*qDestroyQueryDag(pDag);*/
return 0; return 0;
} }
...@@ -178,9 +195,9 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume ...@@ -178,9 +195,9 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
}; };
strcpy(req.cgroup, pConsumer->cgroup); strcpy(req.cgroup, pConsumer->cgroup);
strcpy(req.topicName, pTopic->name); strcpy(req.topicName, pTopic->name);
strcpy(req.sql, pTopic->sql); req.sql = strdup(pTopic->sql);
strcpy(req.logicalPlan, pTopic->logicalPlan); req.logicalPlan = strdup(pTopic->logicalPlan);
strcpy(req.physicalPlan, pTopic->physicalPlan); req.physicalPlan = strdup(pTopic->physicalPlan);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *reqStr = malloc(tlen); void *reqStr = malloc(tlen);
if (reqStr == NULL) { if (reqStr == NULL) {
...@@ -208,19 +225,18 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume ...@@ -208,19 +225,18 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
void mndCleanupSubscribe(SMnode *pMnode) {} void mndCleanupSubscribe(SMnode *pMnode) {}
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t tlen = tEncodeSubscribeObj(NULL, pSub); int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
int32_t size = tlen + MND_SUBSCRIBE_RESERVE_SIZE; int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
if (pRaw == NULL) goto SUB_ENCODE_OVER; if (pRaw == NULL) goto SUB_ENCODE_OVER;
void *buf = malloc(tlen); void *buf = malloc(tlen);
if (buf == NULL) { if (buf == NULL) goto SUB_ENCODE_OVER;
goto SUB_ENCODE_OVER;
}
void *abuf = buf;
tEncodeSubscribeObj(&buf, pSub); void *abuf = buf;
tEncodeSubscribeObj(&abuf, pSub);
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
...@@ -228,6 +244,8 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { ...@@ -228,6 +244,8 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
terrno = TSDB_CODE_SUCCESS;
SUB_ENCODE_OVER: SUB_ENCODE_OVER:
if (terrno != 0) { if (terrno != 0) {
mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
...@@ -259,9 +277,9 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { ...@@ -259,9 +277,9 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0; int32_t dataPos = 0;
int32_t tlen; int32_t tlen;
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
void *buf = malloc(tlen + 1); void *buf = malloc(tlen + 1);
if (buf == NULL) goto SUB_DECODE_OVER; if (buf == NULL) goto SUB_DECODE_OVER;
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
...@@ -269,8 +287,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { ...@@ -269,8 +287,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
goto SUB_DECODE_OVER; goto SUB_DECODE_OVER;
} }
terrno = TSDB_CODE_SUCCESS;
SUB_DECODE_OVER: SUB_DECODE_OVER:
if (terrno != 0) { if (terrno != TSDB_CODE_SUCCESS) {
mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
// TODO free subscribeobj // TODO free subscribeobj
tfree(pRow); tfree(pRow);
...@@ -379,10 +399,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -379,10 +399,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name;
j++; j++;
} else if (j >= oldTopicNum) { } else if (j >= oldTopicNum) {
newTopicName = taosArrayGet(newSub, i); newTopicName = taosArrayGetP(newSub, i);
i++; i++;
} else { } else {
newTopicName = taosArrayGet(newSub, i); newTopicName = taosArrayGetP(newSub, i);
oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name;
int comp = compareLenPrefixedStr(newTopicName, oldTopicName); int comp = compareLenPrefixedStr(newTopicName, oldTopicName);
...@@ -466,6 +486,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -466,6 +486,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
char* key = mndMakeSubscribeKey(consumerGroup, newTopicName);
strcpy(pSub->key, key);
// set unassigned vg // set unassigned vg
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
//TODO: disable alter //TODO: disable alter
...@@ -486,7 +508,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -486,7 +508,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
} }
SSdbRaw *pRaw = mndSubActionEncode(pSub); SSdbRaw *pRaw = mndSubActionEncode(pSub);
/*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/ sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pRaw); mndTransAppendRedolog(pTrans, pRaw);
#if 0 #if 0
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
...@@ -519,8 +541,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -519,8 +541,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
mndTransAppendRedolog(pTrans, pTopicRaw); mndTransAppendRedolog(pTrans, pTopicRaw);
#endif #endif
mndReleaseTopic(pMnode, pTopic); /*mndReleaseTopic(pMnode, pTopic);*/
mndReleaseSubscribe(pMnode, pSub); /*mndReleaseSubscribe(pMnode, pSub);*/
} }
} }
// part3. persist consumerObj // part3. persist consumerObj
......
...@@ -60,7 +60,9 @@ void mndCleanupTopic(SMnode *pMnode) {} ...@@ -60,7 +60,9 @@ void mndCleanupTopic(SMnode *pMnode) {}
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE; int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
int32_t size = sizeof(SMqTopicObj) + logicalPlanLen + physicalPlanLen + pTopic->sqlLen + MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
if (pRaw == NULL) goto TOPIC_ENCODE_OVER; if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
...@@ -74,12 +76,10 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -74,12 +76,10 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1; SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->logicalPlan)+1, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1; SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
...@@ -135,7 +135,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -135,7 +135,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER; goto TOPIC_DECODE_OVER;
} }
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
pTopic->physicalPlan = calloc(len + 1, sizeof(char)); pTopic->physicalPlan = calloc(len + 1, sizeof(char));
...@@ -144,7 +144,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -144,7 +144,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER; goto TOPIC_DECODE_OVER;
} }
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
...@@ -231,6 +231,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) { ...@@ -231,6 +231,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) {
} }
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
mDebug("topic:%s to create", pCreate->name);
SMqTopicObj topicObj = {0}; SMqTopicObj topicObj = {0};
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
...@@ -273,7 +274,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { ...@@ -273,7 +274,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
return 0; return 0;
} else { } else {
terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST; terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
mError("db:%s, failed to create since %s", createTopicReq.name, terrstr()); mError("topic:%s, failed to create since already exists", createTopicReq.name);
return -1; return -1;
} }
} }
......
...@@ -87,6 +87,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f ...@@ -87,6 +87,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f
static const char* jkPnodeType = "Type"; static const char* jkPnodeType = "Type";
static int32_t getPnodeTypeSize(cJSON* json) { static int32_t getPnodeTypeSize(cJSON* json) {
switch (getNumber(json, jkPnodeType)) { switch (getNumber(json, jkPnodeType)) {
case OP_StreamScan:
case OP_TableScan: case OP_TableScan:
case OP_DataBlocksOptScan: case OP_DataBlocksOptScan:
case OP_TableSeqScan: case OP_TableSeqScan:
...@@ -869,6 +870,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { ...@@ -869,6 +870,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
SPhyNode* phyNode = (SPhyNode*)obj; SPhyNode* phyNode = (SPhyNode*)obj;
switch (phyNode->info.type) { switch (phyNode->info.type) {
case OP_TableScan: case OP_TableScan:
case OP_StreamScan:
case OP_DataBlocksOptScan: case OP_DataBlocksOptScan:
case OP_TableSeqScan: case OP_TableSeqScan:
return tableScanNodeFromJson(json, obj); return tableScanNodeFromJson(json, obj);
...@@ -1187,14 +1189,14 @@ SQueryDag* qJsonToDag(const cJSON* pRoot) { ...@@ -1187,14 +1189,14 @@ SQueryDag* qJsonToDag(const cJSON* pRoot) {
if(pDag == NULL) { if(pDag == NULL) {
return NULL; return NULL;
} }
pDag->numOfSubplans = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "numOfSubplans")); pDag->numOfSubplans = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "Number"));
pDag->queryId = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "queryId")); pDag->queryId = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "QueryId"));
pDag->pSubplans = taosArrayInit(0, sizeof(SArray)); pDag->pSubplans = taosArrayInit(0, sizeof(SArray));
if (pDag->pSubplans == NULL) { if (pDag->pSubplans == NULL) {
free(pDag); free(pDag);
return NULL; return NULL;
} }
cJSON* pLevels = cJSON_GetObjectItem(pRoot, "pSubplans"); cJSON* pLevels = cJSON_GetObjectItem(pRoot, "Subplans");
int level = cJSON_GetArraySize(pLevels); int level = cJSON_GetArraySize(pLevels);
for(int i = 0; i < level; i++) { for(int i = 0; i < level; i++) {
SArray* plansOneLevel = taosArrayInit(0, sizeof(void*)); SArray* plansOneLevel = taosArrayInit(0, sizeof(void*));
......
...@@ -1476,7 +1476,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { ...@@ -1476,7 +1476,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
SSubQueryMsg *pMsg = (SSubQueryMsg*) msg; SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;
pMsg->header.vgId = htonl(tInfo.addr.nodeId); pMsg->header.vgId = tInfo.addr.nodeId;
pMsg->sId = schMgmt.sId; pMsg->sId = schMgmt.sId;
pMsg->queryId = plan->id.queryId; pMsg->queryId = plan->id.queryId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册