提交 4a8f6098 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/tkv

......@@ -135,30 +135,30 @@ void basic_consume_loop(tmq_t *tmq,
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t *rk,
void sync_consume_loop(tmq_t *tmq,
tmq_list_t *topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
tmq_resp_err_t err;
if ((err = tmq_subscribe(rk, topics))) {
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
return;
}
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(rk, 500);
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
tmq_commit(rk, NULL, 0);
tmq_commit(tmq, NULL, 0);
}
}
err = tmq_consumer_close(rk);
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
......
......@@ -265,7 +265,8 @@ typedef struct {
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
SSchema schema;
int32_t numOfColumns;
SSchema pSchema[];
} SMAlterStbReq;
typedef struct {
......
......@@ -273,12 +273,11 @@
#define NEW_TK_SOFFSET 64
#define NEW_TK_LIMIT 65
#define NEW_TK_OFFSET 66
#define NEW_TK_NK_LR 67
#define NEW_TK_ASC 68
#define NEW_TK_DESC 69
#define NEW_TK_NULLS 70
#define NEW_TK_FIRST 71
#define NEW_TK_LAST 72
#define NEW_TK_ASC 67
#define NEW_TK_DESC 68
#define NEW_TK_NULLS 69
#define NEW_TK_FIRST 70
#define NEW_TK_LAST 71
#define TK_SPACE 300
#define TK_COMMENT 301
......
......@@ -105,8 +105,9 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
pthread_mutex_lock(&appInfo.mutex);
pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
SAppInstInfo* p = NULL;
if (pInst == NULL) {
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p, key);
......
......@@ -5,14 +5,26 @@ MESSAGE(STATUS "build parser unit test")
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(clientTest ${SOURCE_LIST})
ADD_EXECUTABLE(clientTest clientTests.cpp)
TARGET_LINK_LIBRARIES(
clientTest
PUBLIC os util common transport parser catalog scheduler function gtest taos qcom
)
ADD_EXECUTABLE(tmqTest tmqTest.cpp)
TARGET_LINK_LIBRARIES(
tmqTest
PUBLIC os util common transport parser catalog scheduler function gtest taos qcom
)
TARGET_INCLUDE_DIRECTORIES(
clientTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/client/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/client/inc"
)
TARGET_INCLUDE_DIRECTORIES(
tmqTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/client/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/client/inc"
)
......@@ -564,119 +564,6 @@ TEST(testCase, insert_test) {
}
#endif
TEST(testCase, create_topic_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from tu";
pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_topic_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from st1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
#if 0
TEST(testCase, tmq_subscribe_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_ctb_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
#endif
TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
int cnt = 1;
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
if (msg == NULL) continue;
tmqShowMsg(msg);
if (cnt++ % 10 == 0){
tmq_commit(tmq, NULL, 0);
}
//tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg);
//printf("get msg\n");
}
}
TEST(testCase, tmq_consume_Test) {
}
TEST(testCase, tmq_commit_TEST) {
}
#if 0
TEST(testCase, projection_query_tables) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "../inc/clientInt.h"
#include "taos.h"
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, driverInit_Test) {
taosInitGlobalCfg();
// taos_init();
}
TEST(testCase, create_topic_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from tu";
pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_topic_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from st1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
#if 0
TEST(testCase, tmq_subscribe_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_ctb_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
int cnt = 1;
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
if (msg == NULL) continue;
tmqShowMsg(msg);
if (cnt++ % 10 == 0){
tmq_commit(tmq, NULL, 0);
}
//tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg);
//printf("get msg\n");
}
}
TEST(testCase, tmq_consume_Test) {
}
TEST(testCase, tmq_commit_Test) {
}
#endif
......@@ -301,10 +301,12 @@ typedef struct {
uint64_t uid;
uint64_t dbUid;
int32_t version;
int32_t nextColId;
int32_t numOfColumns;
int32_t numOfTags;
SSchema* pTags;
SSchema* pColumns;
SRWLatch lock;
SSchema* pSchema;
} SStbObj;
typedef struct {
......@@ -617,7 +619,7 @@ typedef struct SMqTopicObj {
int64_t createTime;
int64_t updateTime;
uint64_t uid;
uint64_t dbUid;
int64_t dbUid;
int32_t version;
SRWLatch lock;
int32_t sqlLen;
......
......@@ -84,12 +84,20 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT64(pRaw, dataPos, pStb->uid, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->version, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER)
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pStb->pSchema[i];
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->pTags[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
......@@ -137,17 +145,26 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pStb->uid, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->version, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER)
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
pStb->pSchema = calloc(totalCols, sizeof(SSchema));
if (pStb->pSchema == NULL) {
pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema));
pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema));
if (pStb->pColumns == NULL || pStb->pTags == NULL) {
goto STB_DECODE_OVER;
}
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pStb->pSchema[i];
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->pTags[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
......@@ -183,13 +200,24 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
taosWLockLatch(&pOld->lock);
int32_t totalCols = pNew->numOfTags + pNew->numOfColumns;
int32_t totalSize = totalCols * sizeof(SSchema);
if (pOld->numOfTags + pOld->numOfColumns < totalCols) {
void *pSchema = malloc(totalSize);
if (pOld->numOfColumns < pNew->numOfColumns) {
void *pSchema = malloc(pOld->numOfColumns * sizeof(SSchema));
if (pSchema != NULL) {
free(pOld->pSchema);
pOld->pSchema = pSchema;
free(pOld->pColumns);
pOld->pColumns = pSchema;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
taosWUnLockLatch(&pOld->lock);
}
}
if (pOld->numOfTags < pNew->numOfTags) {
void *pSchema = malloc(pOld->numOfTags * sizeof(SSchema));
if (pSchema != NULL) {
free(pOld->pTags);
pOld->pTags = pSchema;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
......@@ -199,9 +227,11 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
pOld->updateTime = pNew->updateTime;
pOld->version = pNew->version;
pOld->nextColId = pNew->nextColId;
pOld->numOfColumns = pNew->numOfColumns;
pOld->numOfTags = pNew->numOfTags;
memcpy(pOld->pSchema, pNew->pSchema, totalSize);
memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema));
taosWUnLockLatch(&pOld->lock);
return 0;
}
......@@ -242,9 +272,9 @@ static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
req.type = TD_SUPER_TABLE;
req.stbCfg.suid = pStb->uid;
req.stbCfg.nCols = pStb->numOfColumns;
req.stbCfg.pSchema = pStb->pSchema;
req.stbCfg.pSchema = pStb->pColumns;
req.stbCfg.nTagCols = pStb->numOfTags;
req.stbCfg.pTagSchema = pStb->pSchema + pStb->numOfColumns;
req.stbCfg.pTagSchema = pStb->pTags;
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead *pHead = malloc(contLen);
......@@ -442,20 +472,32 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr
stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
stbObj.dbUid = pDb->uid;
stbObj.version = 1;
stbObj.nextColId = 1;
stbObj.numOfColumns = pCreate->numOfColumns;
stbObj.numOfTags = pCreate->numOfTags;
int32_t totalCols = stbObj.numOfColumns + stbObj.numOfTags;
int32_t totalSize = totalCols * sizeof(SSchema);
stbObj.pSchema = malloc(totalSize);
if (stbObj.pSchema == NULL) {
stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema));
if (stbObj.pColumns == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(stbObj.pSchema, pCreate->pSchema, totalSize);
memcpy(stbObj.pColumns, pCreate->pSchema, stbObj.numOfColumns * sizeof(SSchema));
for (int32_t i = 0; i < totalCols; ++i) {
stbObj.pSchema[i].colId = i + 1;
stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema));
if (stbObj.pTags == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(stbObj.pTags, pCreate->pSchema + stbObj.numOfColumns, stbObj.numOfTags * sizeof(SSchema));
for (int32_t i = 0; i < stbObj.numOfColumns; ++i) {
stbObj.pColumns[i].colId = stbObj.nextColId;
stbObj.nextColId++;
}
for (int32_t i = 0; i < stbObj.numOfTags; ++i) {
stbObj.pTags[i].colId = stbObj.nextColId;
stbObj.nextColId++;
}
int32_t code = -1;
......@@ -538,25 +580,29 @@ static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
}
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
SSchema *pSchema = &pAlter->schema;
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
pAlter->numOfColumns = htonl(pAlter->numOfColumns);
if (pSchema->type <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->bytes <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
for (int32_t i = 0; i < pAlter->numOfColumns; ++i) {
SSchema *pSchema = &pAlter->pSchema[i];
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
if (pSchema->type <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->bytes <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
}
return 0;
......@@ -769,14 +815,24 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
pMeta->suid = htobe64(pStb->uid);
pMeta->tuid = htobe64(pStb->uid);
for (int32_t i = 0; i < totalCols; ++i) {
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pStb->pSchema[i];
SSchema *pSrcSchema = &pStb->pColumns[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pMeta->pSchema[i + pStb->numOfColumns];
SSchema *pSrcSchema = &pStb->pTags[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
......@@ -789,11 +845,11 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
}
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen) {
SSdb *pSdb = pMnode->pSdb;
int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
void *buf = malloc(bufSize);
int32_t len = 0;
int32_t contLen = 0;
SSdb *pSdb = pMnode->pSdb;
int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
void *buf = malloc(bufSize);
int32_t len = 0;
int32_t contLen = 0;
STableMetaRsp *pRsp = NULL;
for (int32_t i = 0; i < num; ++i) {
......@@ -803,7 +859,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
stb->tversion = ntohs(stb->tversion);
if ((contLen + sizeof(STableMetaRsp)) > bufSize) {
bufSize = contLen + (num -i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
bufSize = contLen + (num - i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
buf = realloc(buf, bufSize);
}
......@@ -812,9 +868,9 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
strcpy(pRsp->dbFName, stb->dbFName);
strcpy(pRsp->tbName, stb->stbName);
strcpy(pRsp->stbName, stb->stbName);
mDebug("start to retrieve meta, db:%s, stb:%s", stb->dbFName, stb->stbName);
SDbObj *pDb = mndAcquireDb(pMnode, stb->dbFName);
if (pDb == NULL) {
pRsp->numOfColumns = -1;
......@@ -826,7 +882,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", stb->dbFName, stb->stbName);
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
......@@ -836,7 +892,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
mWarn("stb:%s, failed to get meta since %s", tbFName, terrstr());
continue;
}
taosRLockLatch(&pStb->lock);
if (stb->suid == pStb->uid && stb->sversion == pStb->version) {
......@@ -845,17 +901,17 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
mndReleaseStb(pMnode, pStb);
continue;
}
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
int32_t len = totalCols * sizeof(SSchema);
contLen += sizeof(STableMetaRsp) + len;
if (contLen > bufSize) {
bufSize = contLen + (num -i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
bufSize = contLen + (num - i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
buf = realloc(buf, bufSize);
}
pRsp->numOfTags = htonl(pStb->numOfTags);
pRsp->numOfColumns = htonl(pStb->numOfColumns);
pRsp->precision = pDb->cfg.precision;
......@@ -864,15 +920,25 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
pRsp->sversion = htonl(pStb->version);
pRsp->suid = htobe64(pStb->uid);
pRsp->tuid = htobe64(pStb->uid);
for (int32_t i = 0; i < totalCols; ++i) {
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pRsp->pSchema[i];
SSchema *pSrcSchema = &pStb->pSchema[i];
SSchema *pSrcSchema = &pStb->pColumns[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pRsp->pSchema[i + pStb->numOfColumns];
SSchema *pSrcSchema = &pStb->pTags[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
......@@ -890,7 +956,6 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
return 0;
}
static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
SSdb *pSdb = pMnode->pSdb;
......
......@@ -30,7 +30,7 @@ extern SToken nil_token;
SNodeList* createNodeList(SAstCreateContext* pCxt, SNode* pNode);
SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode);
SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableName, const SToken* pColumnName);
SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableAlias, const SToken* pColumnName);
SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* pLiteral);
SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral);
SNode* addMinusSign(SAstCreateContext* pCxt, SNode* pNode);
......
......@@ -187,7 +187,7 @@ table_reference(A) ::= joined_table(B).
table_primary(A) ::= table_name(B) alias_opt(C). { PARSER_TRACE; A = createRealTableNode(pCxt, NULL, &B, &C); }
table_primary(A) ::= db_name(B) NK_DOT table_name(C) alias_opt(D). { PARSER_TRACE; A = createRealTableNode(pCxt, &B, &C, &D); }
table_primary(A) ::= subquery(B) alias_opt(C). { PARSER_TRACE; A = createTempTableNode(pCxt, B, &C); }
table_primary ::= parenthesized_joined_table.
table_primary(A) ::= parenthesized_joined_table(B). { PARSER_TRACE; A = B; }
%type alias_opt { SToken }
%destructor alias_opt { PARSER_DESTRUCTOR_TRACE; }
......@@ -297,9 +297,9 @@ query_expression_body(A) ::=
query_expression_body(B) UNION ALL query_expression_body(D). { PARSER_TRACE; A = createSetOperator(pCxt, SET_OP_TYPE_UNION_ALL, B, D); }
query_primary(A) ::= query_specification(B). { PARSER_TRACE; A = B; }
query_primary(A) ::=
NK_LP query_expression_body(B)
order_by_clause_opt limit_clause_opt slimit_clause_opt NK_RP. { PARSER_TRACE; A = B;}
//query_primary(A) ::=
// NK_LP query_expression_body(B)
// order_by_clause_opt slimit_clause_opt limit_clause_opt NK_RP. { PARSER_TRACE; A = B;}
%type order_by_clause_opt { SNodeList* }
%destructor order_by_clause_opt { PARSER_DESTRUCTOR_TRACE; nodesDestroyList($$); }
......@@ -317,7 +317,7 @@ limit_clause_opt(A) ::= LIMIT NK_INTEGER(B) OFFSET NK_INTEGER(C).
limit_clause_opt(A) ::= LIMIT NK_INTEGER(C) NK_COMMA NK_INTEGER(B). { PARSER_TRACE; A = createLimitNode(pCxt, &B, &C); }
/************************************************ subquery ************************************************************/
subquery(A) ::= NK_LR query_expression(B) NK_RP. { PARSER_TRACE; A = B; }
subquery(A) ::= NK_LP query_expression(B) NK_RP. { PARSER_TRACE; A = B; }
/************************************************ search_condition ****************************************************/
search_condition(A) ::= boolean_value_expression(B). { PARSER_TRACE; A = B; }
......
......@@ -28,6 +28,7 @@ typedef struct SQuery {
} SQuery;
int32_t doParse(SParseContext* pParseCxt, SQuery* pQuery);
int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery);
#ifdef __cplusplus
}
......
......@@ -60,14 +60,14 @@ SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode
return nodesListAppend(pList, pNode);
}
SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableName, const SToken* pColumnName) {
if (!checkTableName(pCxt, pTableName) || !checkColumnName(pCxt, pColumnName)) {
SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableAlias, const SToken* pColumnName) {
if (!checkTableName(pCxt, pTableAlias) || !checkColumnName(pCxt, pColumnName)) {
return NULL;
}
SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
CHECK_OUT_OF_MEM(col);
if (NULL != pTableName) {
strncpy(col->tableName, pTableName->z, pTableName->n);
if (NULL != pTableAlias) {
strncpy(col->tableAlias, pTableAlias->z, pTableAlias->n);
}
strncpy(col->colName, pColumnName->z, pColumnName->n);
return (SNode*)col;
......@@ -151,6 +151,13 @@ SNode* createRealTableNode(SAstCreateContext* pCxt, const SToken* pDbName, const
CHECK_OUT_OF_MEM(realTable);
if (NULL != pDbName) {
strncpy(realTable->table.dbName, pDbName->z, pDbName->n);
} else {
strcpy(realTable->table.dbName, pCxt->pQueryCxt->db);
}
if (NULL != pTableAlias && TK_NIL != pTableAlias->type) {
strncpy(realTable->table.tableAlias, pTableAlias->z, pTableAlias->n);
} else {
strncpy(realTable->table.tableAlias, pTableName->z, pTableName->n);
}
strncpy(realTable->table.tableName, pTableName->z, pTableName->n);
return (SNode*)realTable;
......@@ -160,6 +167,9 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok
STempTableNode* tempTable = (STempTableNode*)nodesMakeNode(QUERY_NODE_TEMP_TABLE);
CHECK_OUT_OF_MEM(tempTable);
tempTable->pSubquery = pSubquery;
if (NULL != pTableAlias && TK_NIL != pTableAlias->type) {
strncpy(tempTable->table.tableAlias, pTableAlias->z, pTableAlias->n);
}
return (SNode*)tempTable;
}
......
此差异已折叠。
......@@ -25,9 +25,85 @@ typedef void (*FFree)(void*);
extern void* NewParseAlloc(FMalloc);
extern void NewParse(void*, int, SToken, void*);
extern void NewParseFree(void*, FFree);
extern void NewParseTrace(FILE*, char*);
static uint32_t toNewTokenId(uint32_t tokenId) {
// #define 1
// #define NEW_TK_AND 2
// #define NEW_TK_UNION 3
// #define NEW_TK_ALL 4
// #define NEW_TK_MINUS 5
// #define NEW_TK_EXCEPT 6
// #define NEW_TK_INTERSECT 7
// #define NEW_TK_NK_PLUS 8
// #define NEW_TK_NK_MINUS 9
// #define NEW_TK_NK_STAR 10
// #define NEW_TK_NK_SLASH 11
// #define NEW_TK_NK_REM 12
// #define NEW_TK_SHOW 13
// #define NEW_TK_DATABASES 14
// #define NEW_TK_NK_INTEGER 15
// #define NEW_TK_NK_FLOAT 16
// #define NEW_TK_NK_STRING 17
// #define NEW_TK_NK_BOOL 18
// #define NEW_TK_TIMESTAMP 19
// #define NEW_TK_NK_VARIABLE 20
// #define NEW_TK_NK_COMMA 21
// #define NEW_TK_NK_ID 22
// #define NEW_TK_NK_LP 23
// #define NEW_TK_NK_RP 24
// #define NEW_TK_NK_DOT 25
// #define NEW_TK_BETWEEN 26
// #define NEW_TK_NOT 27
// #define NEW_TK_IS 28
// #define NEW_TK_NULL 29
// #define NEW_TK_NK_LT 30
// #define NEW_TK_NK_GT 31
// #define NEW_TK_NK_LE 32
// #define NEW_TK_NK_GE 33
// #define NEW_TK_NK_NE 34
// #define 35
// #define NEW_TK_LIKE 36
// #define NEW_TK_MATCH 37
// #define NEW_TK_NMATCH 38
// #define NEW_TK_IN 39
// #define NEW_TK_FROM 40
// #define NEW_TK_AS 41
// #define NEW_TK_JOIN 42
// #define NEW_TK_ON 43
// #define NEW_TK_INNER 44
// #define NEW_TK_SELECT 45
// #define NEW_TK_DISTINCT 46
// #define 47
// #define NEW_TK_PARTITION 48
// #define NEW_TK_BY 49
// #define NEW_TK_SESSION 50
// #define NEW_TK_STATE_WINDOW 51
// #define NEW_TK_INTERVAL 52
// #define NEW_TK_SLIDING 53
// #define NEW_TK_FILL 54
// #define NEW_TK_VALUE 55
// #define NEW_TK_NONE 56
// #define NEW_TK_PREV 57
// #define NEW_TK_LINEAR 58
// #define NEW_TK_NEXT 59
// #define NEW_TK_GROUP 60
// #define NEW_TK_HAVING 61
// #define NEW_TK_ORDER 62
// #define NEW_TK_SLIMIT 63
// #define NEW_TK_SOFFSET 64
// #define NEW_TK_LIMIT 65
// #define NEW_TK_OFFSET 66
// #define NEW_TK_NK_LR 67
// #define NEW_TK_ASC 68
// #define NEW_TK_DESC 69
// #define NEW_TK_NULLS 70
// #define NEW_TK_FIRST 71
// #define NEW_TK_LAST 72
switch (tokenId) {
case TK_OR:
return NEW_TK_OR;
case TK_UNION:
return NEW_TK_UNION;
case TK_ALL:
......@@ -54,10 +130,14 @@ static uint32_t toNewTokenId(uint32_t tokenId) {
return NEW_TK_NK_COMMA;
case TK_DOT:
return NEW_TK_NK_DOT;
case TK_EQ:
return NEW_TK_NK_EQ;
case TK_SELECT:
return NEW_TK_SELECT;
case TK_DISTINCT:
return NEW_TK_DISTINCT;
case TK_WHERE:
return NEW_TK_WHERE;
case TK_AS:
return NEW_TK_AS;
case TK_FROM:
......@@ -70,6 +150,10 @@ static uint32_t toNewTokenId(uint32_t tokenId) {
return NEW_TK_ASC;
case TK_DESC:
return NEW_TK_DESC;
case TK_SPACE:
break;
default:
printf("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!tokenId = %d\n", tokenId);
}
return tokenId;
}
......@@ -125,6 +209,7 @@ int32_t doParse(SParseContext* pParseCxt, SQuery* pQuery) {
default:
NewParse(pParser, t0.type, t0, &cxt);
// NewParseTrace(stdout, "");
if (!cxt.valid) {
goto abort_parse;
}
......@@ -147,12 +232,18 @@ abort_parse:
// STableMeta* pMeta;
// } SNamespace;
typedef enum ESqlClause {
SQL_CLAUSE_FROM = 1,
SQL_CLAUSE_WHERE
} ESqlClause;
typedef struct STranslateContext {
SParseContext* pParseCxt;
int32_t errCode;
SMsgBuf msgBuf;
SArray* pNsLevel; // element is SArray*, the element of this subarray is STableNode*
int32_t currLevel;
ESqlClause currClause;
} STranslateContext;
static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode);
......@@ -177,19 +268,28 @@ static int32_t generateSyntaxErrMsg(STranslateContext* pCxt, int32_t errCode, co
}
static int32_t addNamespace(STranslateContext* pCxt, void* pTable) {
SArray* pTables = NULL;
if (taosArrayGetSize(pCxt->pNsLevel) > pCxt->currLevel) {
pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
size_t currTotalLevel = taosArrayGetSize(pCxt->pNsLevel);
if (currTotalLevel > pCxt->currLevel) {
SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
taosArrayPush(pTables, &pTable);
} else {
pTables = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
do {
SArray* pTables = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
if (pCxt->currLevel == currTotalLevel) {
taosArrayPush(pTables, &pTable);
}
taosArrayPush(pCxt->pNsLevel, &pTables);
++currTotalLevel;
} while (currTotalLevel <= pCxt->currLevel);
}
taosArrayPush(pTables, &pTable);
return TSDB_CODE_SUCCESS;
}
static SName* toName(const SRealTableNode* pRealTable, SName* pName) {
strncpy(pName->dbname, pRealTable->table.dbName, strlen(pRealTable->table.dbName));
strncpy(pName->dbname, pRealTable->table.tableName, strlen(pRealTable->table.tableName));
static SName* toName(int32_t acctId, const SRealTableNode* pRealTable, SName* pName) {
pName->type = TSDB_TABLE_NAME_T;
pName->acctId = acctId;
strcpy(pName->dbname, pRealTable->table.dbName);
strcpy(pName->tname, pRealTable->table.tableName);
return pName;
}
......@@ -213,26 +313,42 @@ static SNodeList* getProjectList(SNode* pNode) {
return NULL;
}
static void setColumnInfoBySchema(const STableNode* pTable, const SSchema* pColSchema, SColumnNode* pCol) {
strcpy(pCol->dbName, pTable->dbName);
strcpy(pCol->tableAlias, pTable->tableAlias);
strcpy(pCol->tableName, pTable->tableName);
strcpy(pCol->colName, pColSchema->name);
if ('\0' == pCol->node.aliasName[0]) {
strcpy(pCol->node.aliasName, pColSchema->name);
}
pCol->colId = pColSchema->colId;
pCol->colType = pColSchema->type;
pCol->node.resType.bytes = pColSchema->bytes;
}
static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SColumnNode* pCol) {
pCol->pProjectRef = (SNode*)pExpr;
pExpr->pAssociationList = nodesListAppend(pExpr->pAssociationList, (SNode*)pCol);
strcpy(pCol->tableAlias, pTable->tableAlias);
strcpy(pCol->colName, pExpr->aliasName);
pCol->node.resType = pExpr->resType;
}
static int32_t createColumnNodeByTable(const STableNode* pTable, SNodeList* pList) {
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta;
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
for (int32_t i = 0; i < nums; ++i) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
pCol->colId = pMeta->schema[i].colId;
pCol->colType = pMeta->schema[i].type;
pCol->node.resType.bytes = pMeta->schema[i].bytes;
setColumnInfoBySchema(pTable, pMeta->schema + i, pCol);
nodesListAppend(pList, (SNode*)pCol);
}
} else {
SNodeList* pProjectList = getProjectList(((STempTableNode*)pTable)->pSubquery);
SNode* pNode;
FOREACH(pNode, pProjectList) {
SExprNode* pExpr = (SExprNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
pCol->pProjectRef = (SNode*)pExpr;
pExpr->pAssociationList = nodesListAppend(pExpr->pAssociationList, (SNode*)pCol);
pCol->node.resType = pExpr->resType;
setColumnInfoByExpr(pTable, (SExprNode*)pNode, pCol);
nodesListAppend(pList, (SNode*)pCol);
}
}
......@@ -245,9 +361,7 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
for (int32_t i = 0; i < nums; ++i) {
if (0 == strcmp(pCol->colName, pMeta->schema[i].name)) {
pCol->colId = pMeta->schema[i].colId;
pCol->colType = pMeta->schema[i].type;
pCol->node.resType.bytes = pMeta->schema[i].bytes;
setColumnInfoBySchema(pTable, pMeta->schema + i, pCol);
found = true;
break;
}
......@@ -258,9 +372,7 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
FOREACH(pNode, pProjectList) {
SExprNode* pExpr = (SExprNode*)pNode;
if (0 == strcmp(pCol->colName, pExpr->aliasName)) {
pCol->pProjectRef = (SNode*)pExpr;
pExpr->pAssociationList = nodesListAppend(pExpr->pAssociationList, (SNode*)pCol);
pCol->node.resType = pExpr->resType;
setColumnInfoByExpr(pTable, pExpr, pCol);
found = true;
break;
}
......@@ -269,45 +381,74 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
return found;
}
static bool doTranslateExpr(SNode* pNode, void* pContext) {
STranslateContext* pCxt = (STranslateContext*)pContext;
switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN: {
SColumnNode* pCol = (SColumnNode*)pNode;
SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
size_t nums = taosArrayGetSize(pTables);
bool hasTableAlias = ('\0' != pCol->tableAlias[0]);
bool found = false;
for (size_t i = 0; i < nums; ++i) {
STableNode* pTable = taosArrayGetP(pTables, i);
if (hasTableAlias) {
if (belongTable(pCxt->pParseCxt->db, pCol, pTable)) {
if (findAndSetColumn(pCol, pTable)) {
break;
}
generateSyntaxErrMsg(pCxt, TSDB_CODE_PARSER_INVALID_COLUMN, pCol->colName);
return false;
}
} else {
if (findAndSetColumn(pCol, pTable)) {
if (found) {
generateSyntaxErrMsg(pCxt, TSDB_CODE_PARSER_AMBIGUOUS_COLUMN, pCol->colName);
return false;
}
found = true;
}
}
static bool translateColumnWithPrefix(STranslateContext* pCxt, SColumnNode* pCol) {
SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
size_t nums = taosArrayGetSize(pTables);
for (size_t i = 0; i < nums; ++i) {
STableNode* pTable = taosArrayGetP(pTables, i);
if (belongTable(pCxt->pParseCxt->db, pCol, pTable)) {
if (findAndSetColumn(pCol, pTable)) {
break;
}
break;
generateSyntaxErrMsg(pCxt, TSDB_CODE_PARSER_INVALID_COLUMN, pCol->colName);
return false;
}
case QUERY_NODE_VALUE:
break; // todo check literal format
case QUERY_NODE_OPERATOR: {
}
return true;
}
break;
static bool translateColumnWithoutPrefix(STranslateContext* pCxt, SColumnNode* pCol) {
SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
size_t nums = taosArrayGetSize(pTables);
bool found = false;
for (size_t i = 0; i < nums; ++i) {
STableNode* pTable = taosArrayGetP(pTables, i);
if (findAndSetColumn(pCol, pTable)) {
if (found) {
generateSyntaxErrMsg(pCxt, TSDB_CODE_PARSER_AMBIGUOUS_COLUMN, pCol->colName);
return false;
}
found = true;
}
}
if (!found) {
generateSyntaxErrMsg(pCxt, TSDB_CODE_PARSER_INVALID_COLUMN, pCol->colName);
return false;
}
return true;
}
static bool translateColumn(STranslateContext* pCxt, SColumnNode* pCol) {
if ('\0' != pCol->tableAlias[0]) {
return translateColumnWithPrefix(pCxt, pCol);
}
return translateColumnWithoutPrefix(pCxt, pCol);
}
// check literal format
static bool translateValue(STranslateContext* pCxt, SValueNode* pVal) {
return true;
}
static bool translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
return true;
}
static bool translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
return true;
}
static bool doTranslateExpr(SNode* pNode, void* pContext) {
STranslateContext* pCxt = (STranslateContext*)pContext;
switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN:
return translateColumn(pCxt, (SColumnNode*)pNode);
case QUERY_NODE_VALUE:
return translateValue(pCxt, (SValueNode*)pNode);
case QUERY_NODE_OPERATOR:
return translateOperator(pCxt, (SOperatorNode*)pNode);
case QUERY_NODE_FUNCTION:
break; // todo
return translateFunction(pCxt, (SFunctionNode*)pNode);
case QUERY_NODE_TEMP_TABLE:
return translateSubquery(pCxt, ((STempTableNode*)pNode)->pSubquery);
default:
......@@ -331,15 +472,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
switch (nodeType(pTable)) {
case QUERY_NODE_REAL_TABLE: {
SRealTableNode* pRealTable = (SRealTableNode*)pTable;
if ('\0' == pRealTable->table.dbName[0]) {
strcpy(pRealTable->table.dbName, pCxt->pParseCxt->db);
}
if ('\0' == pRealTable->table.tableAlias[0]) {
strcpy(pRealTable->table.tableAlias, pRealTable->table.tableName);
}
SName name;
code = catalogGetTableMeta(
pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &(pCxt->pParseCxt->mgmtEpSet), toName(pRealTable, &name), &(pRealTable->pMeta));
code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &(pCxt->pParseCxt->mgmtEpSet),
toName(pCxt->pParseCxt->acctId, pRealTable, &name), &(pRealTable->pMeta));
if (TSDB_CODE_SUCCESS != code) {
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PARSER_TABLE_NOT_EXIST, pRealTable->table.tableName);
}
......@@ -371,10 +506,16 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
return code;
}
static int32_t translateFrom(STranslateContext* pCxt, SNode* pTable) {
pCxt->currClause = SQL_CLAUSE_FROM;
return translateTable(pCxt, pTable);
}
static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool* pIsSelectStar) {
if (NULL == pSelect->pProjectionList) { // select * ...
SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
size_t nums = taosArrayGetSize(pTables);
pSelect->pProjectionList = nodesMakeList();
for (size_t i = 0; i < nums; ++i) {
STableNode* pTable = taosArrayGetP(pTables, i);
createColumnNodeByTable(pTable, pSelect->pProjectionList);
......@@ -383,14 +524,18 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool
} else {
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
int32_t code = TSDB_CODE_SUCCESS;
code = translateTable(pCxt, pSelect->pFromTable);
code = translateFrom(pCxt, pSelect->pFromTable);
if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, pSelect->pWhere);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateExprList(pCxt, pSelect->pGroupByList);
}
bool isSelectStar = false;
if (TSDB_CODE_SUCCESS == code) {
code = translateStar(pCxt, pSelect, &isSelectStar);
......@@ -398,6 +543,7 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (TSDB_CODE_SUCCESS == code && !isSelectStar) {
code = translateExprList(pCxt, pSelect->pProjectionList);
}
// printf("%s:%d code = %d\n", __FUNCTION__, __LINE__, code);
return code;
}
......@@ -415,12 +561,21 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) {
++(pCxt->currLevel);
ESqlClause currClause = pCxt->currClause;
int32_t code = translateQuery(pCxt, pNode);
--(pCxt->currLevel);
pCxt->currClause = currClause;
return code;
}
int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
STranslateContext cxt = { .pParseCxt = pParseCxt, .pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES), .currLevel = 0 };
STranslateContext cxt = {
.pParseCxt = pParseCxt,
.errCode = TSDB_CODE_SUCCESS,
.msgBuf = { .buf = pParseCxt->pMsg, .len = pParseCxt->msgLen },
.pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES),
.currLevel = 0,
.currClause = 0
};
return translateQuery(&cxt, pQuery->pRoot);
}
......@@ -38,34 +38,55 @@ protected:
}
bool run(int32_t expectCode = TSDB_CODE_SUCCESS) {
bool run(int32_t parseCode = TSDB_CODE_SUCCESS, int32_t translateCode = TSDB_CODE_SUCCESS) {
int32_t code = doParse(&cxt_, &query_);
// cout << "doParse return " << code << endl;
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] code:" << tstrerror(code) << ", msg:" << errMagBuf_ << endl;
return (code == expectCode);
return (TSDB_CODE_SUCCESS != parseCode);
}
if (TSDB_CODE_SUCCESS != parseCode) {
return false;
}
code = doTranslate(&cxt_, &query_);
// cout << "doTranslate return " << code << endl;
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] code:" << tstrerror(code) << ", msg:" << errMagBuf_ << endl;
return (TSDB_CODE_SUCCESS != translateCode);
}
if (NULL != query_.pRoot && QUERY_NODE_SELECT_STMT == nodeType(query_.pRoot)) {
SSelectStmt* select = (SSelectStmt*)query_.pRoot;
string sql("SELECT ");
if (select->isDistinct) {
sql.append("DISTINCT ");
}
if (nullptr == select->pProjectionList) {
sql.append("* ");
} else {
nodeListToSql(select->pProjectionList, sql);
}
sql.append("FROM ");
tableToSql(select->pFromTable, sql);
cout << sql << endl;
string sql;
selectToSql(query_.pRoot, sql);
cout << "input sql : [" << cxt_.pSql << "]" << endl;
cout << "output sql : [" << sql << "]" << endl;
}
return (code == expectCode);
return (TSDB_CODE_SUCCESS == translateCode);
}
private:
static const int max_err_len = 1024;
static const int max_sql_len = 1024 * 1024;
void selectToSql(const SNode* node, string& sql) {
SSelectStmt* select = (SSelectStmt*)node;
sql.append("SELECT ");
if (select->isDistinct) {
sql.append("DISTINCT ");
}
if (nullptr == select->pProjectionList) {
sql.append("* ");
} else {
nodeListToSql(select->pProjectionList, sql);
sql.append(" ");
}
sql.append("FROM ");
tableToSql(select->pFromTable, sql);
if (nullptr != select->pWhere) {
sql.append(" WHERE ");
nodeToSql(select->pWhere, sql);
}
}
void tableToSql(const SNode* node, string& sql) {
const STableNode* table = (const STableNode*)node;
switch (nodeType(node)) {
......@@ -78,6 +99,88 @@ private:
sql.append(realTable->table.tableName);
break;
}
case QUERY_NODE_TEMP_TABLE: {
STempTableNode* tempTable = (STempTableNode*)table;
sql.append("(");
selectToSql(tempTable->pSubquery, sql);
sql.append(") ");
sql.append(tempTable->table.tableAlias);
break;
}
case QUERY_NODE_JOIN_TABLE: {
SJoinTableNode* joinTable = (SJoinTableNode*)table;
tableToSql(joinTable->pLeft, sql);
sql.append(" JOIN ");
tableToSql(joinTable->pRight, sql);
if (nullptr != joinTable->pOnCond) {
sql.append(" ON ");
nodeToSql(joinTable->pOnCond, sql);
}
break;
}
default:
break;
}
}
string opTypeToSql(EOperatorType type) {
switch (type) {
case OP_TYPE_ADD:
return " + ";
case OP_TYPE_SUB:
return " - ";
case OP_TYPE_MULTI:
case OP_TYPE_DIV:
case OP_TYPE_MOD:
case OP_TYPE_GREATER_THAN:
case OP_TYPE_GREATER_EQUAL:
case OP_TYPE_LOWER_THAN:
case OP_TYPE_LOWER_EQUAL:
case OP_TYPE_EQUAL:
return " = ";
case OP_TYPE_NOT_EQUAL:
case OP_TYPE_IN:
case OP_TYPE_NOT_IN:
case OP_TYPE_LIKE:
case OP_TYPE_NOT_LIKE:
case OP_TYPE_MATCH:
case OP_TYPE_NMATCH:
case OP_TYPE_JSON_GET_VALUE:
case OP_TYPE_JSON_CONTAINS:
default:
break;
}
return " unknown operator ";
}
void nodeToSql(const SNode* node, string& sql) {
if (nullptr == node) {
return;
}
switch (nodeType(node)) {
case QUERY_NODE_COLUMN: {
SColumnNode* pCol = (SColumnNode*)node;
if ('\0' != pCol->dbName[0]) {
sql.append(pCol->dbName);
sql.append(".");
}
if ('\0' != pCol->tableAlias[0]) {
sql.append(pCol->tableAlias);
sql.append(".");
}
sql.append(pCol->colName);
break;
}
case QUERY_NODE_VALUE:
break;
case QUERY_NODE_OPERATOR: {
SOperatorNode* pOp = (SOperatorNode*)node;
nodeToSql(pOp->pLeft, sql);
sql.append(opTypeToSql(pOp->opType));
nodeToSql(pOp->pRight, sql);
break;
}
default:
break;
}
......@@ -91,13 +194,8 @@ private:
sql.append(", ");
}
firstNode = false;
switch (nodeType(node)) {
case QUERY_NODE_COLUMN:
sql.append(((SColumnNode*)node)->colName);
break;
}
nodeToSql(node, sql);
}
sql.append(" ");
}
void reset() {
......@@ -125,10 +223,13 @@ TEST_F(NewParserTest, selectStar) {
bind("SELECT * FROM test.t1");
ASSERT_TRUE(run());
bind("SELECT ts FROM t1");
bind("SELECT ts, c1 FROM t1");
ASSERT_TRUE(run());
bind("SELECT ts, tag1, c1 FROM t1");
bind("SELECT ts, t.c1 FROM (SELECT * FROM t1) t");
ASSERT_TRUE(run());
bind("SELECT * FROM t1 tt1, t1 tt2 WHERE tt1.c1 = tt2.c1");
ASSERT_TRUE(run());
}
......@@ -147,3 +248,16 @@ TEST_F(NewParserTest, syntaxError) {
bind("SELECT * FROM test.t1 t WHER");
ASSERT_TRUE(run(TSDB_CODE_FAILED));
}
TEST_F(NewParserTest, semanticError) {
setDatabase("root", "test");
bind("SELECT * FROM t10");
ASSERT_TRUE(run(TSDB_CODE_SUCCESS, TSDB_CODE_FAILED));
bind("SELECT c1, c3 FROM t1");
ASSERT_TRUE(run(TSDB_CODE_SUCCESS, TSDB_CODE_FAILED));
bind("SELECT c2 FROM t1 tt1, t1 tt2 WHERE tt1.c1 = tt2.c1");
ASSERT_TRUE(run(TSDB_CODE_SUCCESS, TSDB_CODE_FAILED));
}
......@@ -561,7 +561,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp;
*dataLen = 0;
......@@ -573,7 +573,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd);
return TSDB_CODE_SUCCESS;
}
}
// Got data from sink
......
......@@ -1409,7 +1409,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDa
}
int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) <= 0) {
if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) == 0) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -1454,7 +1454,6 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
}
SSubQueryMsg* pMsg = calloc(1, msgSize);
memcpy(pMsg->msg, msg, msgLen);
pMsg->header.vgId = tInfo.addr.nodeId;
......@@ -1464,6 +1463,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
pMsg->taskType = TASK_TYPE_PERSISTENT;
pMsg->phyLen = msgLen;
pMsg->sqlLen = 0;
memcpy(pMsg->msg, msg, msgLen);
/*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
tInfo.msg = pMsg;
......
......@@ -213,6 +213,12 @@ typedef struct SConnBuffer {
typedef void (*AsyncCB)(uv_async_t* handle);
typedef struct {
void* pThrd;
queue qmsg;
pthread_mutex_t mtx; // protect qmsg;
} SAsyncItem;
typedef struct {
int index;
int nAsync;
......@@ -221,7 +227,7 @@ typedef struct {
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool);
int transSendAsync(SAsyncPool* pool);
int transSendAsync(SAsyncPool* pool, queue* mq);
int transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf);
......
......@@ -432,14 +432,15 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
}
}
static void clientAsyncCb(uv_async_t* handle) {
SCliThrdObj* pThrd = handle->data;
SAsyncItem* item = handle->data;
SCliThrdObj* pThrd = item->pThrd;
SCliMsg* pMsg = NULL;
queue wq;
// batch process to avoid to lock/unlock frequently
pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_MOVE(&pThrd->msg, &wq);
pthread_mutex_unlock(&pThrd->msgMtx);
pthread_mutex_lock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
pthread_mutex_unlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
......@@ -548,11 +549,11 @@ static void clientSendQuit(SCliThrdObj* thrd) {
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
msg->ctx = NULL; //
pthread_mutex_lock(&thrd->msgMtx);
QUEUE_PUSH(&thrd->msg, &msg->q);
pthread_mutex_unlock(&thrd->msgMtx);
// pthread_mutex_lock(&thrd->msgMtx);
// QUEUE_PUSH(&thrd->msg, &msg->q);
// pthread_mutex_unlock(&thrd->msgMtx);
transSendAsync(thrd->asyncPool);
transSendAsync(thrd->asyncPool, &msg->q);
// uv_async_send(thrd->cliAsync);
}
void taosCloseClient(void* arg) {
......@@ -598,14 +599,14 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
pthread_mutex_lock(&thrd->msgMtx);
QUEUE_PUSH(&thrd->msg, &cliMsg->q);
pthread_mutex_unlock(&thrd->msgMtx);
// pthread_mutex_lock(&thrd->msgMtx);
// QUEUE_PUSH(&thrd->msg, &cliMsg->q);
// pthread_mutex_unlock(&thrd->msgMtx);
int start = taosGetTimestampUs();
transSendAsync(thrd->asyncPool);
// int start = taosGetTimestampUs();
transSendAsync(thrd->asyncPool, &(cliMsg->q));
// uv_async_send(thrd->cliAsync);
int end = taosGetTimestampUs() - start;
// int end = taosGetTimestampUs() - start;
// tError("client sent to rpc, time cost: %d", (int)end);
}
#endif
......@@ -247,7 +247,7 @@ int transDestroyBuffer(SConnBuffer* buf) {
}
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) {
static int sz = 20;
static int sz = 10;
SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
pool->index = 0;
......@@ -257,24 +257,46 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
uv_async_init(loop, async, cb);
async->data = arg;
SAsyncItem* item = calloc(1, sizeof(SAsyncItem));
item->pThrd = arg;
QUEUE_INIT(&item->qmsg);
pthread_mutex_init(&item->mtx, NULL);
async->data = item;
}
return pool;
}
void transDestroyAsyncPool(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
pthread_mutex_destroy(&item->mtx);
free(item);
}
free(pool->asyncs);
free(pool);
}
int transSendAsync(SAsyncPool* pool) {
int transSendAsync(SAsyncPool* pool, queue* q) {
int idx = pool->index;
idx = idx % pool->nAsync;
// no need mutex here
if (pool->index++ > pool->nAsync) {
pool->index = 0;
}
return uv_async_send(&(pool->asyncs[idx]));
uv_async_t* async = &(pool->asyncs[idx]);
SAsyncItem* item = async->data;
int64_t st = taosGetTimestampUs();
pthread_mutex_lock(&item->mtx);
QUEUE_PUSH(&item->qmsg, q);
pthread_mutex_unlock(&item->mtx);
int64_t el = taosGetTimestampUs() - st;
if (el > 50) {
// tInfo("lock and unlock cost: %d", (int)el);
}
return uv_async_send(async);
}
#endif
......@@ -376,13 +376,15 @@ static void destroySmsg(SSrvMsg* smsg) {
free(smsg);
}
void uvWorkerAsyncCb(uv_async_t* handle) {
SWorkThrdObj* pThrd = handle->data;
SAsyncItem* item = handle->data;
SWorkThrdObj* pThrd = item->pThrd;
SSrvConn* conn = NULL;
queue wq;
// batch process to avoid to lock/unlock frequently
pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_MOVE(&pThrd->msg, &wq);
pthread_mutex_unlock(&pThrd->msgMtx);
pthread_mutex_lock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
pthread_mutex_unlock(&item->mtx);
// pthread_mutex_unlock(&mtx);
while (!QUEUE_IS_EMPTY(&wq)) {
queue* head = QUEUE_HEAD(&wq);
......@@ -539,7 +541,7 @@ static bool addHandleToAcceptloop(void* arg) {
tError("failed to bind: %s", uv_err_name(err));
return false;
}
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
tError("failed to listen: %s", uv_err_name(err));
return false;
}
......@@ -671,12 +673,12 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
pthread_mutex_unlock(&pThrd->msgMtx);
// pthread_mutex_lock(&pThrd->msgMtx);
// QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
// pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("send quit msg to work thread");
transSendAsync(pThrd->asyncPool);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
// uv_async_send(pThrd->workerAsync);
}
......@@ -712,12 +714,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
srvMsg->pConn = pConn;
srvMsg->msg = *pMsg;
pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
pthread_mutex_unlock(&pThrd->msgMtx);
// pthread_mutex_lock(&pThrd->msgMtx);
// QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
// pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("conn %p start to send resp", pConn);
transSendAsync(pThrd->asyncPool);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
// uv_async_send(pThrd->workerAsync);
}
......
import hudson.model.Result
import hudson.model.*;
import jenkins.model.CauseOfInterruption
node {
}
def skipbuild=0
def win_stop=0
def abortPreviousBuilds() {
def currentJobName = env.JOB_NAME
def currentBuildNumber = env.BUILD_NUMBER.toInteger()
def jobs = Jenkins.instance.getItemByFullName(currentJobName)
def builds = jobs.getBuilds()
for (build in builds) {
if (!build.isBuilding()) {
continue;
}
if (currentBuildNumber == build.getNumber().toInteger()) {
continue;
}
build.doKill() //doTerm(),doKill(),doTerm()
}
}
// abort previous build
abortPreviousBuilds()
def abort_previous(){
def buildNumber = env.BUILD_NUMBER as int
if (buildNumber > 1) milestone(buildNumber - 1)
milestone(buildNumber)
}
def pre_test(){
sh'hostname'
sh '''
sudo rmtaos || echo "taosd has not installed"
'''
sh '''
killall -9 taosd ||echo "no taosd running"
killall -9 gdb || echo "no gdb running"
killall -9 python3.8 || echo "no python program running"
cd ${WKC}
'''
script {
if (env.CHANGE_TARGET == 'master') {
sh '''
cd ${WKC}
git checkout master
'''
}
else if(env.CHANGE_TARGET == '2.0'){
sh '''
cd ${WKC}
git checkout 2.0
'''
}
else if(env.CHANGE_TARGET == '3.0'){
sh '''
cd ${WKC}
git checkout 3.0
'''
}
else{
sh '''
cd ${WKC}
git checkout develop
'''
}
}
sh'''
cd ${WKC}
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
export TZ=Asia/Harbin
date
rm -rf debug
mkdir debug
cd debug
cmake .. > /dev/null
make -j4> /dev/null
'''
return 1
}
pipeline {
agent none
options { skipDefaultCheckout() }
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDengine'
}
stages {
stage('pre_build'){
agent{label 'slave3_0'}
options { skipDefaultCheckout() }
when {
changeRequest()
}
steps {
script{
abort_previous()
abortPreviousBuilds()
}
timeout(time: 45, unit: 'MINUTES'){
pre_test()
sh'''
cd ${WKC}/tests
./test-all.sh b1fq
'''
sh'''
cd ${WKC}/debug
ctest
'''
}
}
}
}
post {
success {
emailext (
subject: "PR-result: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' SUCCESS",
body: """<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${env.BRANCH_NAME}</li>
<li>构建结果:<span style="color:green"> Successful </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${env.CHANGE_AUTHOR}</li>
<li>提交信息:${env.CHANGE_TITLE}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>""",
to: "${env.CHANGE_AUTHOR_EMAIL}",
from: "support@taosdata.com"
)
}
failure {
emailext (
subject: "PR-result: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' FAIL",
body: """<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${env.BRANCH_NAME}</li>
<li>构建结果:<span style="color:red"> Failure </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${env.CHANGE_AUTHOR}</li>
<li>提交信息:${env.CHANGE_TITLE}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>""",
to: "${env.CHANGE_AUTHOR_EMAIL}",
from: "support@taosdata.com"
)
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册