提交 06b9caa5 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into test/chr/TD-14699

......@@ -15,12 +15,12 @@ MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH
MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH})
find_package(Git QUIET)
if(GIT_FOUND AND EXISTS "${PROJECT_SOURCE_DIR}/.git")
if(GIT_FOUND AND EXISTS "${TD_SOURCE_DIR}/.git")
# Update submodules as needed
option(GIT_SUBMODULE "Check submodules during build" ON)
if(GIT_SUBMODULE)
message(STATUS "Submodule update")
execute_process(COMMAND ${GIT_EXECUTABLE} submodule update --init --recursive
execute_process(COMMAND cd ${TD_SOURCE_DIR} && ${GIT_EXECUTABLE} submodule update --init --recursive
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
RESULT_VARIABLE GIT_SUBMOD_RESULT)
if(NOT GIT_SUBMOD_RESULT EQUAL "0")
......@@ -29,7 +29,7 @@ if(GIT_FOUND AND EXISTS "${PROJECT_SOURCE_DIR}/.git")
endif()
endif()
if(NOT EXISTS "${PROJECT_SOURCE_DIR}/tools/taos-tools/CMakeLists.txt")
if(NOT EXISTS "${TD_SOURCE_DIR}/tools/taos-tools/CMakeLists.txt")
message(WARNING "The submodules were not downloaded! GIT_SUBMODULE was turned off or failed. Please update submodules manually if you need build them.")
endif()
......
......@@ -83,7 +83,7 @@ int32_t create_stream() {
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(
pConn,
"create stream stream1 trigger window_close into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
"create stream stream1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
"from tu1 interval(10m)");
if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
......
......@@ -221,15 +221,12 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
#if 0
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
#endif
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
......@@ -240,6 +237,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
#endif
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
enum tmq_conf_res_t {
......@@ -268,12 +266,9 @@ DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
#endif
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql);
#endif
/* ------------------------------ TMQ END -------------------------------- */
#if 1 // Shuduo: temporary enable for app build
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
#endif
......
......@@ -45,7 +45,6 @@ extern "C" {
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "smas"
#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes"
#define TSDB_PERFS_TABLE_CONNECTIONS "connections"
#define TSDB_PERFS_TABLE_QUERIES "queries"
#define TSDB_PERFS_TABLE_TOPICS "topics"
......
......@@ -99,7 +99,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_VGROUP,
TSDB_MGMT_TABLE_TOPICS,
TSDB_MGMT_TABLE_CONSUMERS,
TSDB_MGMT_TABLE_SUBSCRIBES,
TSDB_MGMT_TABLE_SUBSCRIPTIONS,
TSDB_MGMT_TABLE_TRANS,
TSDB_MGMT_TABLE_SMAS,
TSDB_MGMT_TABLE_CONFIGS,
......@@ -131,12 +131,10 @@ typedef enum _mgmt_table {
#define TSDB_ALTER_USER_SUPERUSER 0x2
#define TSDB_ALTER_USER_ADD_READ_DB 0x3
#define TSDB_ALTER_USER_REMOVE_READ_DB 0x4
#define TSDB_ALTER_USER_CLEAR_READ_DB 0x5
#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7
#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8
#define TSDB_ALTER_USER_ADD_ALL_DB 0x9
#define TSDB_ALTER_USER_REMOVE_ALL_DB 0xA
#define TSDB_ALTER_USER_ADD_WRITE_DB 0x5
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x6
#define TSDB_ALTER_USER_ADD_ALL_DB 0x7
#define TSDB_ALTER_USER_REMOVE_ALL_DB 0x8
#define TSDB_ALTER_USER_PRIVILEGES 0x2
......
......@@ -80,6 +80,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count);
void taosFprintfFile(TdFilePtr pFile, const char *format, ...);
int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict ptrBuf);
int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf);
int32_t taosEOFFile(TdFilePtr pFile);
......
......@@ -86,10 +86,9 @@ extern const int32_t TYPE_BYTES[15];
#define TS_PATH_DELIMITER "."
#define TS_ESCAPE_CHAR '`'
#define TSDB_TIME_PRECISION_MILLI 0
#define TSDB_TIME_PRECISION_MICRO 1
#define TSDB_TIME_PRECISION_NANO 2
#define TSDB_TIME_PRECISION_MILLI 0
#define TSDB_TIME_PRECISION_MICRO 1
#define TSDB_TIME_PRECISION_NANO 2
#define TSDB_TIME_PRECISION_HOURS 3
#define TSDB_TIME_PRECISION_MINUTES 4
#define TSDB_TIME_PRECISION_SECONDS 5
......@@ -249,7 +248,6 @@ typedef enum ELogicConditionType {
#define TSDB_SHOW_SQL_LEN 512
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000
#define TSDB_SHOW_LIST_LEN 1000
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16
......@@ -376,9 +374,9 @@ typedef enum ELogicConditionType {
* 1. ordinary sub query for select * from super_table
* 2. all sqlobj generated by createSubqueryObj with this flag
*/
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
......
......@@ -395,7 +395,7 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
}
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
tmq_list_append(*topics, topic->topicName);
tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
}
return TMQ_RESP_ERR__SUCCESS;
}
......
......@@ -14,9 +14,9 @@
*/
#include "systable.h"
#include "taos.h"
#include "tdef.h"
#include "types.h"
#include "taos.h"
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
......@@ -265,7 +265,7 @@ static const SSysDbTableSchema consumerSchema[] = {
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topics", .bytes = TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
......@@ -274,8 +274,8 @@ static const SSysDbTableSchema consumerSchema[] = {
};
static const SSysDbTableSchema subscriptionSchema[] = {
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
};
......
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
#include "libs/function/function.h"
static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); }
......@@ -29,6 +30,9 @@ static void smClose(SMgmtWrapper *pWrapper) {
if (pMgmt == NULL) return;
dInfo("snode-mgmt start to cleanup");
udfcClose();
if (pMgmt->pSnode != NULL) {
smStopWorker(pMgmt);
sndClose(pMgmt->pSnode);
......@@ -68,6 +72,10 @@ int32_t smOpen(SMgmtWrapper *pWrapper) {
}
dmReportStartup(pWrapper->pDnode, "snode-worker", "initialized");
if (udfcOpen() != 0) {
dError("failed to open udfc in snode");
}
return 0;
}
......
......@@ -324,7 +324,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
dmReportStartup(pDnode, "vnode-vnodes", "initialized");
if (udfcOpen() != 0) {
dError("failed to open udfc in dnode");
dError("failed to open udfc in vnode");
}
code = 0;
......
......@@ -198,6 +198,9 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rsp.refId = pMsg->rpcMsg.refId;
tmsgSendRsp(&rsp);
}
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
}
......@@ -211,6 +214,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
// todo
SRpcMsg *pRsp = NULL;
(void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
}
......
......@@ -26,7 +26,7 @@ int32_t mndInitAuth(SMnode *pMnode);
void mndCleanupAuth(SMnode *pMnode);
int32_t mndCheckCreateUserAuth(SUserObj *pOperUser);
int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, SAlterUserReq *pAlter);
int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
int32_t mndCheckDropUserAuth(SUserObj *pOperUser);
int32_t mndCheckNodeAuth(SUserObj *pOperUser);
......
......@@ -33,6 +33,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
#ifdef __cplusplus
}
#endif
......
......@@ -79,14 +79,12 @@ int32_t mndCheckCreateUserAuth(SUserObj *pOperUser) {
return -1;
}
int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, SAlterUserReq *pAlter) {
int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter) {
if (pAlter->alterType == TSDB_ALTER_USER_PASSWD) {
if (pOperUser->superUser || strcmp(pUser->user, pOperUser->user) == 0) {
return 0;
}
}
if (pAlter->alterType == TSDB_ALTER_USER_SUPERUSER) {
} else if (pAlter->alterType == TSDB_ALTER_USER_SUPERUSER) {
if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) {
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
......@@ -95,21 +93,12 @@ int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb,
if (pOperUser->superUser) {
return 0;
}
}
if (pAlter->alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB || pAlter->alterType == TSDB_ALTER_USER_CLEAR_READ_DB) {
} else {
if (pOperUser->superUser) {
return 0;
}
}
if (pAlter->alterType == TSDB_ALTER_USER_ADD_READ_DB || pAlter->alterType == TSDB_ALTER_USER_REMOVE_READ_DB ||
pAlter->alterType == TSDB_ALTER_USER_ADD_WRITE_DB || pAlter->alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) {
if (pOperUser->superUser || strcmp(pUser->user, pDb->createUser) == 0) {
return 0;
}
}
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
......
......@@ -790,71 +790,83 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
if (pShow->pIter == NULL) break;
SColumnInfoData *pColInfo;
int32_t cols = 0;
taosRLockLatch(&pConsumer->lock);
// consumer id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
// group id
char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN);
varDataSetLen(groupId, strlen(varDataVal(groupId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)groupId, false);
// app id
char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN);
varDataSetLen(appId, strlen(varDataVal(appId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)appId, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20);
varDataSetLen(status, strlen(varDataVal(status)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
// subscribed topics
// TODO: split into multiple rows
char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0};
char *showStr = taosShowStrArray(pConsumer->assignedTopics);
tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN);
taosMemoryFree(showStr);
varDataSetLen(topics, strlen(varDataVal(topics)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topics, false);
// pid
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true);
// end point
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);
// up time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
// subscribe time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
// rebalance time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
bool hasTopic = true;
if (topicSz == 0) {
hasTopic = false;
topicSz = 1;
}
for (int32_t i = 0; i < topicSz; i++) {
if (numOfRows + topicSz > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
}
SColumnInfoData *pColInfo;
int32_t cols = 0;
// consumer id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
// group id
char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN);
varDataSetLen(groupId, strlen(varDataVal(groupId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)groupId, false);
// app id
char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN);
varDataSetLen(appId, strlen(varDataVal(appId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)appId, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20);
varDataSetLen(status, strlen(varDataVal(status)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
// one subscribed topic
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (hasTopic) {
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
tstrncpy(varDataVal(topic), topicName, TSDB_TOPIC_FNAME_LEN);
varDataSetLen(topic, strlen(varDataVal(topic)));
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
} else {
colDataAppend(pColInfo, numOfRows, NULL, true);
}
// pid
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true);
// end point
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);
// up time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
// subscribe time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
// rebalance time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
numOfRows++;
}
taosRUnLockLatch(&pConsumer->lock);
sdbRelease(pSdb, pConsumer);
numOfRows++;
}
pShow->numOfRows += numOfRows;
......
......@@ -85,8 +85,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_VGROUP;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_CONSUMERS, len) == 0) {
type = TSDB_MGMT_TABLE_CONSUMERS;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIBES, len) == 0) {
type = TSDB_MGMT_TABLE_SUBSCRIBES;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIPTIONS, len) == 0) {
type = TSDB_MGMT_TABLE_SUBSCRIPTIONS;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_TRANS, len) == 0) {
type = TSDB_MGMT_TABLE_TRANS;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SMAS, len) == 0) {
......
......@@ -259,7 +259,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return -1;
}
#if 1
#if 0
printf("|");
for (int i = 0; i < pStream->outputSchema.nCols; i++) {
printf(" %15s |", (char *)pStream->outputSchema.pSchema[i].name);
......
......@@ -44,6 +44,9 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs
static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg);
static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
if (pRedoRaw == NULL) return -1;
......@@ -71,6 +74,10 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
return sdbSetTable(pMnode->pSdb, table);
}
......@@ -706,3 +713,124 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
END:
return code;
}
static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SMqSubscribeObj *pSub = NULL;
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
if (pShow->pIter == NULL) break;
taosRLockLatch(&pSub->lock);
if (numOfRows + pSub->vgNum > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
}
SMqConsumerEp *pConsumerEp = NULL;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
pConsumerEp = (SMqConsumerEp *)pIter;
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
for (int32_t j = 0; j < sz; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
SColumnInfoData *pColInfo;
int32_t cols = 0;
// topic and cgroup
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
mndSplitSubscribeKey(pSub->key, topic, cgroup);
varDataSetLen(topic, strlen(varDataVal(topic)));
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// vg id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
// consumer id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
// offset
#if 0
// subscribe time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
// rebalance time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
#endif
numOfRows++;
}
}
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
SColumnInfoData *pColInfo;
int32_t cols = 0;
// topic and cgroup
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
mndSplitSubscribeKey(pSub->key, topic, cgroup);
varDataSetLen(topic, strlen(varDataVal(topic)));
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// vg id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
// consumer id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, NULL, true);
// offset
#if 0
// subscribe time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
// rebalance time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
#endif
numOfRows++;
}
taosRUnLockLatch(&pSub->lock);
sdbRelease(pSdb, pSub);
}
pShow->numOfRows += numOfRows;
return numOfRows;
}
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
......@@ -35,6 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
......@@ -61,6 +62,11 @@ int32_t mndInitTopic(SMnode *pMnode) {
void mndCleanupTopic(SMnode *pMnode) {}
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
//
return strchr(topic, '.') + 1;
}
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -394,6 +394,8 @@ static SHashObj *mndDupDbHash(SHashObj *pOld) {
static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
int32_t code = -1;
SUserObj *pUser = NULL;
SUserObj *pOperUser = NULL;
......@@ -429,7 +431,13 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
goto _OVER;
}
if (mndCheckAlterUserAuth(pOperUser, pUser, &alterReq) != 0) {
goto _OVER;
}
memcpy(&newUser, pUser, sizeof(SUserObj));
newUser.authVersion++;
newUser.updateTime = taosGetTimestampMs();
taosRLockLatch(&pUser->lock);
newUser.readDbs = mndDupDbHash(pUser->readDbs);
......@@ -440,63 +448,90 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
goto _OVER;
}
int32_t len = strlen(alterReq.dbname) + 1;
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
mndReleaseDb(pMnode, pDb);
if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) {
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass);
memcpy(newUser.pass, pass, TSDB_PASSWORD_LEN);
} else if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) {
}
if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) {
newUser.superUser = alterReq.superUser;
} else if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB) {
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto _OVER;
}
if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
newUser.authVersion++;
} else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB) {
if (taosHashRemove(newUser.readDbs, alterReq.dbname, len) != 0) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto _OVER;
}
newUser.authVersion++;
} else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_READ_DB) {
taosHashClear(newUser.readDbs);
newUser.authVersion++;
} else if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB) {
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto _OVER;
}
if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) {
if (strcmp(alterReq.dbname, "*") != 0) {
int32_t len = strlen(alterReq.dbname) + 1;
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
if (pDb == NULL) {
mndReleaseDb(pMnode, pDb);
goto _OVER;
}
if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) {
mndReleaseDb(pMnode, pDb);
goto _OVER;
}
} else {
while (1) {
SDbObj *pDb = NULL;
pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb);
if (pIter == NULL) break;
int32_t len = strlen(pDb->name) + 1;
taosHashPut(newUser.readDbs, pDb->name, len, pDb->name, TSDB_DB_FNAME_LEN);
sdbRelease(pSdb, pDb);
}
}
newUser.authVersion++;
} else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) {
if (taosHashRemove(newUser.writeDbs, alterReq.dbname, len) != 0) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto _OVER;
}
if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) {
if (strcmp(alterReq.dbname, "*") != 0) {
int32_t len = strlen(alterReq.dbname) + 1;
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
if (pDb == NULL) {
mndReleaseDb(pMnode, pDb);
goto _OVER;
}
if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) {
mndReleaseDb(pMnode, pDb);
goto _OVER;
}
} else {
while (1) {
SDbObj *pDb = NULL;
pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb);
if (pIter == NULL) break;
int32_t len = strlen(pDb->name) + 1;
taosHashPut(newUser.writeDbs, pDb->name, len, pDb->name, TSDB_DB_FNAME_LEN);
sdbRelease(pSdb, pDb);
}
}
newUser.authVersion++;
} else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB) {
taosHashClear(newUser.writeDbs);
newUser.authVersion++;
} else {
terrno = TSDB_CODE_MND_INVALID_ALTER_OPER;
goto _OVER;
}
newUser.updateTime = taosGetTimestampMs();
if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_DB) {
if (strcmp(alterReq.dbname, "*") != 0) {
int32_t len = strlen(alterReq.dbname) + 1;
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
if (pDb == NULL) {
mndReleaseDb(pMnode, pDb);
goto _OVER;
}
taosHashRemove(newUser.readDbs, alterReq.dbname, len);
} else {
taosHashClear(newUser.readDbs);
}
}
if (mndCheckAlterUserAuth(pOperUser, pUser, pDb, &alterReq) != 0) {
goto _OVER;
if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_DB) {
if (strcmp(alterReq.dbname, "*") != 0) {
int32_t len = strlen(alterReq.dbname) + 1;
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
if (pDb == NULL) {
mndReleaseDb(pMnode, pDb);
goto _OVER;
}
taosHashRemove(newUser.writeDbs, alterReq.dbname, len);
} else {
taosHashClear(newUser.writeDbs);
}
}
code = mndAlterUser(pMnode, pUser, &newUser, pReq);
......
......@@ -238,9 +238,10 @@ TEST_F(MndTestUser, 03_Alter_User) {
{
SAlterUserReq alterReq = {0};
alterReq.alterType = TSDB_ALTER_USER_CLEAR_WRITE_DB;
alterReq.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB;
strcpy(alterReq.user, "u3");
strcpy(alterReq.pass, "1");
strcpy(alterReq.dbname, "*");
int32_t contLen = tSerializeSAlterUserReq(NULL, 0, &alterReq);
void* pReq = rpcMallocCont(contLen);
......@@ -253,9 +254,10 @@ TEST_F(MndTestUser, 03_Alter_User) {
{
SAlterUserReq alterReq = {0};
alterReq.alterType = TSDB_ALTER_USER_CLEAR_READ_DB;
alterReq.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB;
strcpy(alterReq.user, "u3");
strcpy(alterReq.pass, "1");
strcpy(alterReq.dbname, "*");
int32_t contLen = tSerializeSAlterUserReq(NULL, 0, &alterReq);
void* pReq = rpcMallocCont(contLen);
......
......@@ -879,14 +879,14 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
}
}
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, TDRowVerT maxVer) {
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow,
TDRowVerT maxVer) {
STSRow *rmem = NULL, *rimem = NULL;
if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node != NULL) {
rmem = (STSRow*)SL_GET_NODE_DATA(node);
#if 0 // TODO: skiplist refactor
#if 0 // TODO: skiplist refactor
if (TD_ROW_VER(rmem) > maxVer) {
rmem = NULL;
}
......@@ -898,7 +898,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
if (node != NULL) {
rimem = (STSRow*)SL_GET_NODE_DATA(node);
#if 0 // TODO: skiplist refactor
#if 0 // TODO: skiplist refactor
if (TD_ROW_VER(rimem) > maxVer) {
rimem = NULL;
}
......@@ -1677,7 +1677,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
colIdOfRow2 = tdKvRowColIdAt(row2, k);
}
if (colIdOfRow1 < colIdOfRow2) { // the most probability
if (colIdOfRow1 < colIdOfRow2) { // the most probability
if (colIdOfRow1 < pColInfo->info.colId) {
++j;
continue;
......@@ -1720,7 +1720,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
++(*curRow);
}
++nResult;
} else if (update){
} else if (update) {
mergeOption = 2;
} else {
mergeOption = 0;
......@@ -1741,7 +1741,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
++(*curRow);
}
++nResult;
} else if(update) {
} else if (update) {
mergeOption = 2;
} else {
mergeOption = 0;
......@@ -2018,9 +2018,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
rv2 = TD_ROW_SVER(row2);
}
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
// numOfRows += 1;
numOfRows +=
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
}
......@@ -2028,7 +2028,6 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
cur->win.ekey = key;
cur->lastKey = key + step;
cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
#if 0
......@@ -2064,7 +2063,11 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
#endif
if (TD_SUPPORT_UPDATE(pCfg->update)) {
doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
if (lastKeyAppend != key) {
lastKeyAppend = key;
++curRow;
}
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
if (rv1 != TD_ROW_SVER(row1)) {
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
......@@ -2074,10 +2077,10 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2 = TD_ROW_SVER(row2);
}
numOfRows +=
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
// ++numOfRows;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
}
......@@ -2117,15 +2120,20 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
lastKeyAppend = tsArray[qend];
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend);
if ((lastKeyAppend != TSKEY_INITIAL_VAL) &&
(lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) {
++curRow;
}
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
pos += (qend - qstart + 1) * step;
if(numOfRows > 0) {
if (numOfRows > 0) {
curRow = numOfRows - 1;
}
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart];
cur->lastKey = cur->win.ekey + step;
lastKeyAppend = cur->win.ekey;
}
} while (numOfRows < pTsdbReadHandle->outputCapacity);
......@@ -2429,7 +2437,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
STimeWindow win = TSWINDOW_INITIALIZER;
STimeWindow win = TSWINDOW_INITIALIZER;
while (true) {
tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
......@@ -2735,7 +2743,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
STSchema* pSchema = NULL;
TSKEY lastRowKey = TSKEY_INITIAL_VAL;
do {
STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX);
if (row == NULL) {
......@@ -2760,8 +2767,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
rv = TD_ROW_SVER(row);
}
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema,
NULL, pCfg->update, &lastRowKey);
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId,
pSchema, NULL, pCfg->update, &lastRowKey);
if (numOfRows >= maxRowsToRead) {
moveToNextRowInMem(pCheckInfo);
......@@ -2770,7 +2777,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
} while (moveToNextRowInMem(pCheckInfo));
taosMemoryFreeClear(pSchema); // free the STSChema
taosMemoryFreeClear(pSchema); // free the STSChema
assert(numOfRows <= maxRowsToRead);
......@@ -2898,8 +2905,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
// if (ret != TSDB_CODE_SUCCESS) {
// return false;
// }
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, pCheckInfo->tableId,
NULL, NULL, true, &lastRowKey);
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols,
pCheckInfo->tableId, NULL, NULL, true, &lastRowKey);
taosMemoryFreeClear(pRow);
// update the last key value
......@@ -3468,7 +3475,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
pDataBlockInfo->rows = cur->rows;
pDataBlockInfo->window = cur->win;
// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
}
/*
......
......@@ -1259,7 +1259,9 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
return false;
}
UdfcFuncHandle handle;
if (setupUdf((char*)pCtx->udfName, &handle) != 0) {
int32_t udfCode = 0;
if ((udfCode = setupUdf((char*)pCtx->udfName, &handle)) != 0) {
fnError("udfAggInit error. step setupUdf. udf code: %d", udfCode);
return false;
}
SClientUdfUvSession *session = (SClientUdfUvSession *)handle;
......@@ -1272,7 +1274,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
udfRes->session = (SClientUdfUvSession *)handle;
SUdfInterBuf buf = {0};
if (callUdfAggInit(handle, &buf) != 0) {
if ((udfCode = callUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step callUdfAggInit. udf code: %d", udfCode);
return false;
}
udfRes->interResNum = buf.numOfResult;
......@@ -1316,21 +1319,23 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
.numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0};
callUdfAggProcess(session, inputBlock, &state, &newState);
udfRes->interResNum = newState.numOfResult;
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
int32_t udfCode = callUdfAggProcess(session, inputBlock, &state, &newState);
if (udfCode != 0) {
fnError("udfAggProcess error. code: %d", udfCode);
newState.numOfResult = 0;
} else {
udfRes->interResNum = newState.numOfResult;
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
}
if (newState.numOfResult == 1 || state.numOfResult == 1) {
GET_RES_INFO(pCtx)->numOfRes = 1;
}
blockDataDestroy(inputBlock);
taosArrayDestroy(tempBlock.pDataBlock);
taosMemoryFree(newState.buf);
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
......@@ -1344,15 +1349,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize,
.numOfResult = udfRes->interResNum};
callUdfAggFinalize(session, &state, &resultBuf);
udfRes->finalResBuf = resultBuf.buf;
udfRes->finalResNum = resultBuf.numOfResult;
teardownUdf(session);
int32_t udfCallCode= 0;
udfCallCode= callUdfAggFinalize(session, &state, &resultBuf);
if (udfCallCode!= 0) {
fnError("udfAggFinalize error. callUdfAggFinalize step. udf code:%d", udfCallCode);
GET_RES_INFO(pCtx)->numOfRes = 0;
} else {
memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
udfRes->finalResNum = resultBuf.numOfResult;
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
}
if (resultBuf.numOfResult == 1) {
GET_RES_INFO(pCtx)->numOfRes = 1;
int32_t code = teardownUdf(session);
if (code != 0) {
fnError("udfAggFinalize error. teardownUdf step. udf code: %d", code);
}
return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
}
\ No newline at end of file
......@@ -347,10 +347,21 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
if (fmIsUserDefinedFunc(node->funcId)) {
UdfcFuncHandle udfHandle = NULL;
SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle));
code = setupUdf(node->functionName, &udfHandle);
if (code != 0) {
sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code);
goto _return;
}
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
teardownUdf(udfHandle);
SCL_ERR_JRET(code);
if (code != 0) {
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
goto _return;
}
code = teardownUdf(udfHandle);
if (code != 0) {
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
goto _return;
}
} else {
SScalarFuncExecFuncs ffpSet = {0};
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
......
......@@ -822,7 +822,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
varDataSetLen(output, len);
}
//for constant conversion, need to set proper length of pOutput description
if (len < outputLen - VARSTR_HEADER_SIZE) {
if (len < outputLen) {
pOutput->columnData->info.bytes = len;
}
break;
......
......@@ -65,6 +65,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
newCommitIndex = index;
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld",
newCommitIndex, pSyncNode->commitIndex);
syncEntryDestory(pEntry);
break;
} else {
sTrace(
......@@ -72,6 +74,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
"pSyncNode->pRaftStore->currentTerm:%lu",
pEntry->term, pSyncNode->pRaftStore->currentTerm);
}
syncEntryDestory(pEntry);
}
}
......
......@@ -853,12 +853,13 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
taosThreadOnce(&transModuleInit, uvInitEnv);
transSrvInst++;
char pipeName[64];
assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0));
#ifdef WINDOWS
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc\\%p-%lu", taosSafeRand(), GetCurrentProcessId());
char pipeName[64];
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId());
#else
snprintf(pipeName, sizeof(pipeName), ".trans.rpc\\%08X-%lu", taosSafeRand(), taosGetSelfPthreadId());
char pipeName[PATH_MAX] = {0};
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId());
#endif
assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));
......@@ -871,20 +872,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
thrd->pTransInst = shandle;
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
// #ifdef WINDOWS
// uv_file fds[2];
// if (uv_pipe(fds, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE) != 0) {
// #else
// uv_os_sock_t fds[2];
// if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
// #endif
// goto End;
// }
// uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
// uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
// thrd->fd = fds[0];
thrd->pipe = &(srv->pipe[i][1]); // init read
if (false == addHandleToWorkloop(thrd,pipeName)) {
......
......@@ -55,15 +55,15 @@ int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
}
static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
int code = 0;
int ret = 0;
TdFilePtr pIdxTFile = pRead->pReadIdxTFile;
TdFilePtr pLogTFile = pRead->pReadLogTFile;
// seek position
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
code = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
if (code < 0) {
ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......@@ -72,14 +72,14 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
// TODO:deserialize
ASSERT(entry.ver == ver);
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
if (code < 0) {
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return code;
return ret;
}
static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
......@@ -108,7 +108,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
}
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
int code;
SWal *pWal = pRead->pWal;
if (ver == pRead->curVersion) {
return 0;
......@@ -126,16 +125,15 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
if (pRead->curFileFirstVer != pRet->firstVer) {
code = walReadChangeFile(pRead, pRet->firstVer);
if (code < 0) {
if (walReadChangeFile(pRead, pRet->firstVer) < 0) {
return -1;
}
}
code = walReadSeekFilePos(pRead, pRet->firstVer, ver);
if (code < 0) {
if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
return -1;
}
pRead->curVersion = ver;
return 0;
......@@ -246,8 +244,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int code;
// TODO: check wal life
if (pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver);
if (code < 0) {
if (walReadSeekVer(pRead, ver) < 0) {
return -1;
}
}
......@@ -278,8 +275,12 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
pRead->capacity = pRead->pHead->head.bodyLen;
}
if (pRead->pHead->head.bodyLen !=
taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) {
if ((code = taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
pRead->pHead->head.bodyLen) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
......
......@@ -26,7 +26,6 @@ typedef struct TdDirEntry {
WIN32_FIND_DATA findFileData;
} TdDirEntry;
typedef struct TdDir {
TdDirEntry dirEntry;
HANDLE hFind;
......@@ -59,7 +58,7 @@ void wordfree(wordexp_t *pwordexp) {}
#include <wordexp.h>
typedef struct dirent dirent;
typedef struct DIR TdDir;
typedef struct DIR TdDir;
typedef struct dirent TdDirEntry;
#endif
......@@ -78,14 +77,14 @@ void taosRemoveDir(const char *dirname) {
taosRemoveDir(filename);
} else {
(void)taosRemoveFile(filename);
//printf("file:%s is removed\n", filename);
// printf("file:%s is removed\n", filename);
}
}
taosCloseDir(&pDir);
rmdir(dirname);
//printf("dir:%s is removed\n", dirname);
// printf("dir:%s is removed\n", dirname);
return;
}
......@@ -102,8 +101,8 @@ int32_t taosMkDir(const char *dirname) {
int32_t taosMulMkDir(const char *dirname) {
if (dirname == NULL) return -1;
char *temp = strdup(dirname);
char *pos = temp;
char * temp = strdup(dirname);
char * pos = temp;
int32_t code = 0;
if (strncmp(temp, "/", 1) == 0) {
......@@ -111,8 +110,8 @@ int32_t taosMulMkDir(const char *dirname) {
} else if (strncmp(temp, "./", 2) == 0) {
pos += 2;
}
for ( ; *pos != '\0'; pos++) {
for (; *pos != '\0'; pos++) {
if (*pos == '/') {
*pos = '\0';
code = mkdir(temp, 0755);
......@@ -123,7 +122,7 @@ int32_t taosMulMkDir(const char *dirname) {
*pos = '/';
}
}
if (*(pos - 1) != '/') {
code = mkdir(temp, 0755);
if (code < 0 && errno != EEXIST) {
......@@ -145,7 +144,7 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) {
TdDirPtr pDir = taosOpenDir(dirname);
if (pDir == NULL) return;
int64_t sec = taosGetTimestampSec();
int64_t sec = taosGetTimestampSec();
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
......@@ -173,9 +172,9 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) {
int32_t days = (int32_t)(TABS(sec - fileSec) / 86400 + 1);
if (days > keepDays) {
(void)taosRemoveFile(filename);
//printf("file:%s is removed, days:%d keepDays:%d", filename, days, keepDays);
// printf("file:%s is removed, days:%d keepDays:%d", filename, days, keepDays);
} else {
//printf("file:%s won't be removed, days:%d keepDays:%d", filename, days, keepDays);
// printf("file:%s won't be removed, days:%d keepDays:%d", filename, days, keepDays);
}
}
}
......@@ -187,7 +186,7 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) {
int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen) {
wordexp_t full_path;
if (0 != wordexp(dirname, &full_path, 0)) {
//printf("failed to expand path:%s since %s", dirname, strerror(errno));
// printf("failed to expand path:%s since %s", dirname, strerror(errno));
wordfree(&full_path);
return -1;
}
......@@ -228,9 +227,9 @@ bool taosIsDir(const char *dirname) {
return false;
}
char* taosDirName(char *name) {
char *taosDirName(char *name) {
#ifdef WINDOWS
char Drive1[MAX_PATH], Dir1[MAX_PATH];
char Drive1[MAX_PATH], Dir1[MAX_PATH];
_splitpath(name, Drive1, Dir1, NULL, NULL);
size_t dirNameLen = strlen(Drive1) + strlen(Dir1);
if (dirNameLen > 0) {
......@@ -242,13 +241,13 @@ char* taosDirName(char *name) {
#endif
}
char* taosDirEntryBaseName(char *name) {
char *taosDirEntryBaseName(char *name) {
#ifdef WINDOWS
char Filename1[MAX_PATH], Ext1[MAX_PATH];
_splitpath(name, NULL, NULL, Filename1, Ext1);
return name + (strlen(name) - strlen(Filename1) - strlen(Ext1));
#else
return (char*)basename(name);
return (char *)basename(name);
#endif
}
......@@ -258,8 +257,8 @@ TdDirPtr taosOpenDir(const char *dirname) {
}
#ifdef WINDOWS
char szFind[MAX_PATH]; //这是要找的
HANDLE hFind;
char szFind[MAX_PATH]; //这是要找的
HANDLE hFind;
TdDirPtr pDir = taosMemoryMalloc(sizeof(TdDir));
......@@ -275,7 +274,6 @@ TdDirPtr taosOpenDir(const char *dirname) {
#else
return (TdDirPtr)opendir(dirname);
#endif
}
TdDirEntryPtr taosReadDir(TdDirPtr pDir) {
......@@ -286,9 +284,9 @@ TdDirEntryPtr taosReadDir(TdDirPtr pDir) {
if (!FindNextFile(pDir->hFind, &(pDir->dirEntry.findFileData))) {
return NULL;
}
return (TdDirEntryPtr)&(pDir->dirEntry.findFileData);
return (TdDirEntryPtr) & (pDir->dirEntry.findFileData);
#else
return (TdDirEntryPtr)readdir((DIR*)pDir);
return (TdDirEntryPtr)readdir((DIR *)pDir);
#endif
}
......@@ -299,18 +297,18 @@ bool taosDirEntryIsDir(TdDirEntryPtr pDirEntry) {
#ifdef WINDOWS
return (pDirEntry->findFileData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) != 0;
#else
return (((dirent*)pDirEntry)->d_type & DT_DIR) != 0;
return (((dirent *)pDirEntry)->d_type & DT_DIR) != 0;
#endif
}
char* taosGetDirEntryName(TdDirEntryPtr pDirEntry) {
char *taosGetDirEntryName(TdDirEntryPtr pDirEntry) {
if (pDirEntry == NULL) {
return NULL;
}
#ifdef WINDOWS
return pDirEntry->findFileData.cFileName;
#else
return ((dirent*)pDirEntry)->d_name;
return ((dirent *)pDirEntry)->d_name;
#endif
}
......@@ -324,7 +322,7 @@ int32_t taosCloseDir(TdDirPtr *ppDir) {
*ppDir = NULL;
return 0;
#else
closedir((DIR*)*ppDir);
closedir((DIR *)*ppDir);
*ppDir = NULL;
return 0;
#endif
......
......@@ -688,6 +688,16 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) {
return getline(ptrBuf, &len, pFile->fp);
#endif
}
int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf) {
if (pFile == NULL || buf == NULL ) {
return -1;
}
assert(pFile->fp != NULL);
if (fgets(buf, maxSize, pFile->fp) == NULL) {
return -1;
}
return strlen(buf);
}
int32_t taosEOFFile(TdFilePtr pFile) {
if (pFile == NULL) {
return 0;
......
......@@ -28,6 +28,7 @@
#else
#include <arpa/inet.h>
#include <fcntl.h>
#include <net/if.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
......@@ -638,6 +639,73 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
return 0;
}
int taosGetLocalIp(const char *eth, char *ip) {
#if defined(WINDOWS)
// DO NOTHAING
return 0;
#else
int fd;
struct ifreq ifr;
struct sockaddr_in sin;
fd = socket(AF_INET, SOCK_DGRAM, 0);
if (-1 == fd) {
return -1;
}
strncpy(ifr.ifr_name, eth, IFNAMSIZ);
ifr.ifr_name[IFNAMSIZ - 1] = 0;
if (ioctl(fd, SIOCGIFADDR, &ifr) < 0) {
taosCloseSocketNoCheck1(fd);
return -1;
}
memcpy(&sin, &ifr.ifr_addr, sizeof(sin));
snprintf(ip, 64, "%s", inet_ntoa(sin.sin_addr));
taosCloseSocketNoCheck1(fd);
#endif
return 0;
}
int taosValidIp(uint32_t ip) {
#if defined(WINDOWS)
// DO NOTHAING
return 0;
#else
int ret = -1;
int fd;
struct ifconf ifconf;
char buf[512] = {0};
ifconf.ifc_len = 512;
ifconf.ifc_buf = buf;
if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
return -1;
}
ioctl(fd, SIOCGIFCONF, &ifconf);
struct ifreq *ifreq = (struct ifreq *)ifconf.ifc_buf;
for (int i = (ifconf.ifc_len / sizeof(struct ifreq)); i > 0; i--) {
char ip_str[64] = {0};
if (ifreq->ifr_flags == AF_INET) {
ret = taosGetLocalIp(ifreq->ifr_name, ip_str);
if (ret != 0) {
break;
}
ret = -1;
if (ip == (uint32_t)taosInetAddr(ip_str)) {
ret = 0;
break;
}
ifreq++;
}
}
taosCloseSocketNoCheck1(fd);
return ret;
#endif
return 0;
}
bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd;
SocketFd fd;
......@@ -677,13 +745,9 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
taosCloseSocket(&pSocket);
return false;
}
if (listen(pSocket->fd, 1024) < 0) {
// printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(&pSocket);
return false;
}
taosCloseSocket(&pSocket);
return true;
// return 0 == taosValidIp(ip) ? true : false;
}
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd;
......
......@@ -5,7 +5,7 @@ set +e
echo "Executing copy_udf.sh"
SCRIPT_DIR=`dirname $0`
SCRIPT_DIR=`pwd`
cd $SCRIPT_DIR/../
IN_TDINTERNAL="community"
......@@ -23,11 +23,12 @@ echo $UDF1_DIR
echo $UDF2_DIR
UDF_TMP=/tmp/udf
rm -rf $UDF_TMP
mkdir $UDF_TMP
rm $UDF_TMP/libudf1.so
rm $UDF_TMP/libudf2.so
echo "Copy udf shared library files to $UDF_TMP"
cp $UDF1_DIR $UDF_TMP
cp $UDF1_DIR $UDF_TMP
echo "copy udf1 result: $?"
cp $UDF2_DIR $UDF_TMP
echo "copy udf2 result: $?"
......@@ -62,11 +62,7 @@ fi
TOP_DIR=`pwd`
if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2,3`
else
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2`
fi
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2`
declare -x BUILD_DIR=$TOP_DIR/$BIN_DIR
......
......@@ -33,7 +33,7 @@ if $rows != 3 then
return -1
endi
sql create stream s1 into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m)
sql create stream s1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m)
sql show stables
if $rows != 2 then
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== show users
sql create database d1 vgroups 1;
sql create database d2 vgroups 1;
sql create database d3 vgroups 1;
sql show databases
if $rows != 5 then
return -1
endi
print =============== create users
sql create user user1 PASS 'user1'
sql create user user2 PASS 'user2'
sql show users
if $rows != 3 then
return -1
endi
print =============== test read
sql_error GRANT read ON d1.* to a;
sql_error GRANT read ON d0.* to user1;
sql GRANT read ON d1.* to user1;
sql GRANT read ON d2.* to user1;
sql GRANT read ON *.* to user1;
sql REVOKE read ON d1.* from user1;
sql REVOKE read ON d2.* from user1;
sql REVOKE read ON *.* from user1;
print =============== test write
sql_error GRANT write ON d1.* to a;
sql_error GRANT write ON d0.* to user1;
sql GRANT write ON d1.* to user1;
sql GRANT write ON d2.* to user1;
sql GRANT write ON *.* to user1;
sql REVOKE write ON d1.* from user1;
sql REVOKE write ON d2.* from user1;
sql REVOKE write ON *.* from user1;
print =============== test all
sql_error GRANT all ON d1.* to a;
sql_error GRANT all ON d0.* to user1;
sql GRANT all ON d1.* to user1;
sql GRANT all ON d2.* to user1;
sql GRANT all ON *.* to user1;
sql REVOKE all ON d1.* from user1;
sql REVOKE all ON d2.* from user1;
sql REVOKE all ON *.* from user1;
print =============== test read write
sql_error GRANT read,write ON d1.* to a;
sql_error GRANT read,write ON d0.* to user1;
sql GRANT read,write ON d1.* to user1;
sql GRANT read,write ON d2.* to user1;
sql GRANT read,write ON *.* to user1;
sql REVOKE read,write ON d1.* from user1;
sql REVOKE read,write ON d2.* from user1;
sql REVOKE read,write ON *.* from user1;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -20,7 +20,7 @@ class TDTestCase:
__sql = f"select cast({col_name} as bigint), {col_name} from {tbname}"
tdSql.query(sql=__sql)
data_tb_col = [result[1] for result in tdSql.queryResult]
for i in range(len(tdSql.queryRows)):
for i in range(tdSql.queryRows):
tdSql.checkData( i, 0, None ) if data_tb_col[i] is None else tdSql.checkData( i, 0, int(data_tb_col[i]) )
def __range_to_bigint(self,cols,tables):
......@@ -32,7 +32,7 @@ class TDTestCase:
__sql = f"select cast({col_name} as timestamp), {col_name} from {tbname}"
tdSql.query(sql=__sql)
data_tb_col = [result[1] for result in tdSql.queryResult]
for i in range(len(tdSql.queryRows)):
for i in range(tdSql.queryRows):
if data_tb_col[i] is None:
tdSql.checkData( i, 0 , None )
if col_name not in ["c2", "double"] or tbname != "t1" or i != 10:
......@@ -597,37 +597,37 @@ class TDTestCase:
tdLog.printNoPrefix("==========step39: cast constant operation to bigint, expect change to int ")
tdSql.query("select cast(12121.23323131 as bigint) as b from ct4")
( tdSql.checkData(i, 0, 12121) for i in range(len(tdSql.queryRows) ) )
( tdSql.checkData(i, 0, 12121) for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 as binary(16)) as b from ct4")
( tdSql.checkData(i, 0, '12121.233231') for i in range(len(tdSql.queryRows)) )
( tdSql.checkData(i, 0, '12121.233231') for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 as binary(2)) as b from ct4")
( tdSql.checkData(i, 0, '12') for i in range(len(tdSql.queryRows) ) )
( tdSql.checkData(i, 0, '12') for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 as nchar(16)) as b from ct4")
( tdSql.checkData(i, 0, '12121.233231') for i in range(len(tdSql.queryRows) ) )
( tdSql.checkData(i, 0, '12121.233231') for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 as nchar(2)) as b from ct4")
( tdSql.checkData(i, 0, '12') for i in range(len(tdSql.queryRows) ) )
( tdSql.checkData(i, 0, '12') for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 + 321.876897998 as bigint) as b from ct4")
( tdSql.checkData(i, 0, 12443) for i in range(len(tdSql.queryRows) ) )
( tdSql.checkData(i, 0, 12443) for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 + 321.876897998 as binary(16)) as b from ct4")
( tdSql.checkData(i, 0, '12443.110129') for i in range(len(tdSql.queryRows)) )
( tdSql.checkData(i, 0, '12443.110129') for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 + 321.876897998 as binary(3)) as b from ct4")
( tdSql.checkData(i, 0, '124') for i in range(len(tdSql.queryRows) ) )
( tdSql.checkData(i, 0, '124') for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 + 321.876897998 as nchar(16)) as b from ct4")
( tdSql.checkData(i, 0, '12443.110129') for i in range(len(tdSql.queryRows)) )
( tdSql.checkData(i, 0, '12443.110129') for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 + 321.876897998 as nchar(3)) as b from ct4")
( tdSql.checkData(i, 0, '124') for i in range(len(tdSql.queryRows) ) )
( tdSql.checkData(i, 0, '124') for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 + 'test~!@`#$%^&*()}{][;><.,' as bigint) as b from ct4")
( tdSql.checkData(i, 0, 12121) for i in range(len(tdSql.queryRows) ) )
( tdSql.checkData(i, 0, 12121) for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 + 'test~!@`#$%^&*()}{][;><.,' as binary(16)) as b from ct4")
( tdSql.checkData(i, 0, '12121.233231') for i in range(len(tdSql.queryRows)) )
( tdSql.checkData(i, 0, '12121.233231') for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 + 'test~!@`#$%^&*()}{][;><.,' as binary(2)) as b from ct4")
( tdSql.checkData(i, 0, '12') for i in range(len(tdSql.queryRows) ) )
( tdSql.checkData(i, 0, '12') for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 + 'test~!@`#$%^&*()}{][;><.,' as nchar(16)) as b from ct4")
( tdSql.checkData(i, 0, '12121.233231') for i in range(len(tdSql.queryRows) ) )
( tdSql.checkData(i, 0, '12121.233231') for i in range(tdSql.queryRows) )
tdSql.query("select cast(12121.23323131 + 'test~!@`#$%^&*()}{][;><.,' as nchar(2)) as b from ct4")
( tdSql.checkData(i, 0, '12') for i in range(len(tdSql.queryRows) ) )
( tdSql.checkData(i, 0, '12') for i in range(tdSql.queryRows) )
tdLog.printNoPrefix("==========step40: error cast condition, should return error ")
tdSql.error("select cast(c1 as int) as b from ct4")
......
......@@ -232,13 +232,13 @@ class TDTestCase:
tdLog.printNoPrefix("==========step3:all check")
self.all_test()
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdDnodes.stop(1)
tdDnodes.start(1)
# tdSql.execute("use db")
tdSql.execute("use db")
# tdLog.printNoPrefix("==========step4:after wal, all check again ")
# self.all_test()
tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test()
def stop(self):
tdSql.close()
......
......@@ -49,7 +49,7 @@ class TDTestCase:
for num_col in NUM_COL:
concat_condition.extend( f"cast( {num_col} + {bool_col} as binary(16) )" for bool_col in BOOLEAN_COL )
concat_condition.extend( f"cast( {num_col} + {ts_col} as binary(16) )" for ts_col in TS_TYPE_COL )
concat_condition.extend( f"cast( {num_col} + {ts_col} as binary(16) )" for ts_col in TS_TYPE_COL if num_col is not FLOAT_COL and num_col is not DOUBLE_COL)
concat_condition.extend( f"cast( {bool_col} + {ts_col} as binary(16) )" for bool_col in BOOLEAN_COL for ts_col in TS_TYPE_COL )
......@@ -82,7 +82,7 @@ class TDTestCase:
if num > 8 or num < 2 :
[tdSql.error(f"select concat( {','.join( condition ) }) from {tbname} {where_condition} {group} ") for group in groups ]
break
tdSql.query(f"select {','.join(condition)} from {tbname} ")
rows = tdSql.queryRows
......@@ -271,13 +271,13 @@ class TDTestCase:
tdLog.printNoPrefix("==========step3:all check")
self.all_test()
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdDnodes.stop(1)
tdDnodes.start(1)
# tdSql.execute("use db")
tdSql.execute("use db")
# tdLog.printNoPrefix("==========step4:after wal, all check again ")
# self.all_test()
tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test()
def stop(self):
tdSql.close()
......
......@@ -49,7 +49,7 @@ class TDTestCase:
for num_col in NUM_COL:
concat_ws_condition.extend( f"cast( {num_col} + {bool_col} as binary(16) )" for bool_col in BOOLEAN_COL )
concat_ws_condition.extend( f"cast( {num_col} + {ts_col} as binary(16) )" for ts_col in TS_TYPE_COL )
concat_ws_condition.extend( f"cast( {num_col} + {ts_col} as binary(16) )" for ts_col in TS_TYPE_COL if num_col is not FLOAT_COL and num_col is not DOUBLE_COL)
concat_ws_condition.extend( f"cast( {bool_col} + {ts_col} as binary(16) )" for bool_col in BOOLEAN_COL for ts_col in TS_TYPE_COL )
......@@ -82,7 +82,7 @@ class TDTestCase:
if num > 8 or num < 2 :
[tdSql.error(f"select concat_ws('_', {','.join( condition ) }) from {tbname} {where_condition} {group} ") for group in groups ]
break
tdSql.query(f"select {','.join(condition)} from {tbname} ")
rows = tdSql.queryRows
......@@ -271,13 +271,13 @@ class TDTestCase:
tdLog.printNoPrefix("==========step3:all check")
self.all_test()
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdDnodes.stop(1)
tdDnodes.start(1)
# tdSql.execute("use db")
tdSql.execute("use db")
# tdLog.printNoPrefix("==========step4:after wal, all check again ")
# self.all_test()
tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test()
def stop(self):
tdSql.close()
......
......@@ -99,6 +99,7 @@ class TDTestCase:
if not join_flag :
tdSql.error(sql=sql)
break
if len(tblist) == 2:
if "ct1" in tblist or "t1" in tblist:
self.__join_current(sql, checkrows)
......@@ -111,42 +112,9 @@ class TDTestCase:
if len(tblist) > 2 or len(tblist) < 1:
tdSql.error(sql=sql)
# def __join_err_check(self,tbname):
# sqls = []
# for un_char_col in NUM_COL:
# sqls.extend(
# (
# f"select length( {un_char_col} ) from {tbname} ",
# f"select length(ceil( {un_char_col} )) from {tbname} ",
# f"select {un_char_col} from {tbname} group by length( {un_char_col} ) ",
# )
# )
# sqls.extend( f"select length( {un_char_col} + {un_char_col_2} ) from {tbname} " for un_char_col_2 in NUM_COL )
# sqls.extend( f"select length( {un_char_col} + {ts_col} ) from {tbname} " for ts_col in TS_TYPE_COL )
# sqls.extend( f"select {char_col} from {tbname} group by length( {char_col} ) " for char_col in CHAR_COL)
# sqls.extend( f"select length( {ts_col} ) from {tbname} " for ts_col in TS_TYPE_COL )
# sqls.extend( f"select length( {char_col} + {ts_col} ) from {tbname} " for char_col in NUM_COL for ts_col in TS_TYPE_COL)
# sqls.extend( f"select length( {char_col} + {char_col_2} ) from {tbname} " for char_col in CHAR_COL for char_col_2 in CHAR_COL )
# sqls.extend( f"select upper({char_col}, 11) from {tbname} " for char_col in CHAR_COL )
# sqls.extend( f"select upper({char_col}) from {tbname} interval(2d) sliding(1d)" for char_col in CHAR_COL )
# sqls.extend(
# (
# f"select length() from {tbname} ",
# f"select length(*) from {tbname} ",
# f"select length(ccccccc) from {tbname} ",
# f"select length(111) from {tbname} ",
# f"select length(c8, 11) from {tbname} ",
# )
# )
# return sqls
def __join_current(self, sql, checkrows):
tdSql.query(sql=sql)
tdSql.checkRows(checkrows)
# tdSql.checkRows(checkrows)
def __test_current(self):
......@@ -197,10 +165,10 @@ class TDTestCase:
tbname = ["ct1", "ct2", "ct4", "t1"]
for tb in tbname:
for errsql in self.__length_err_check(tb):
tdSql.error(sql=errsql)
tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========")
# for tb in tbname:
# for errsql in self.__join_err_check(tb):
# tdSql.error(sql=errsql)
# tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========")
def all_test(self):
......@@ -319,13 +287,13 @@ class TDTestCase:
tdLog.printNoPrefix("==========step3:all check")
self.all_test()
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdDnodes.stop(1)
tdDnodes.start(1)
# tdSql.execute("use db")
tdSql.execute("use db")
# tdLog.printNoPrefix("==========step4:after wal, all check again ")
# self.all_test()
tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test()
def stop(self):
tdSql.close()
......
......@@ -233,13 +233,13 @@ class TDTestCase:
tdLog.printNoPrefix("==========step3:all check")
self.all_test()
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdDnodes.stop(1)
tdDnodes.start(1)
# tdSql.execute("use db")
tdSql.execute("use db")
# tdLog.printNoPrefix("==========step4:after wal, all check again ")
# self.all_test()
tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test()
def stop(self):
tdSql.close()
......
......@@ -59,12 +59,9 @@ class TDTestCase:
groups = ["", group_having, group_no_having]
for group_condition in groups:
tdSql.query(f"select {condition} from {tbname} {where_condition} {group_condition} ")
datas = [tdSql.getData(i,0) for i in range(tdSql.queryRows)]
lower_data = [ str(data).lower() if data else None for data in datas ]
tdSql.query(f"select lower( {condition} ) from {tbname} {where_condition} {group_condition}")
for i in range(len(lower_data)):
tdSql.checkData(i, 0, lower_data[i] ) if lower_data[i] else tdSql.checkData(i, 0, None)
tdSql.query(f"select lower( {condition} ), {condition} from {tbname} {where_condition} {group_condition}")
for i in range(tdSql.queryRows):
tdSql.checkData(i, 0, str(tdSql.getData(i, 1)).lower() ) if tdSql.getData(i, 1) else tdSql.checkData(i, 0, None)
def __lower_err_check(self,tbname):
sqls = []
......@@ -230,13 +227,13 @@ class TDTestCase:
tdLog.printNoPrefix("==========step3:all check")
self.all_test()
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdDnodes.stop(1)
tdDnodes.start(1)
# tdSql.execute("use db")
tdSql.execute("use db")
# tdLog.printNoPrefix("==========step4:after wal, all check again ")
# self.all_test()
tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test()
def stop(self):
tdSql.close()
......
......@@ -59,12 +59,9 @@ class TDTestCase:
groups = ["", group_having, group_no_having]
for group_condition in groups:
tdSql.query(f"select {condition} from {tbname} {where_condition} {group_condition} ")
datas = [tdSql.getData(i,0) for i in range(tdSql.queryRows)]
upper_data = [ str(data).upper() if data else None for data in datas ]
tdSql.query(f"select upper( {condition} ) from {tbname} {where_condition} {group_condition}")
for i in range(len(upper_data)):
tdSql.checkData(i, 0, upper_data[i] ) if upper_data[i] else tdSql.checkData(i, 0, None)
tdSql.query(f"select upper( {condition} ), {condition} from {tbname} {where_condition} {group_condition}")
for i in range(tdSql.queryRows):
tdSql.checkData(i, 0, str(tdSql.getData(i, 1)).upper() ) if tdSql.getData(i, 1) else tdSql.checkData(i, 0, None)
def __upper_err_check(self,tbname):
sqls = []
......@@ -229,13 +226,13 @@ class TDTestCase:
tdLog.printNoPrefix("==========step3:all check")
self.all_test()
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdDnodes.stop(1)
tdDnodes.start(1)
# tdSql.execute("use db")
tdSql.execute("use db")
# tdLog.printNoPrefix("==========step4:after wal, all check again ")
# self.all_test()
tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test()
def stop(self):
tdSql.close()
......
......@@ -13,6 +13,13 @@ python3 ./test.py -f 2-query/distinct.py
python3 ./test.py -f 2-query/varchar.py
python3 ./test.py -f 2-query/ltrim.py
python3 ./test.py -f 2-query/rtrim.py
python3 ./test.py -f 2-query/length.py
python3 ./test.py -f 2-query/char_length.py
python3 ./test.py -f 2-query/upper.py
python3 ./test.py -f 2-query/lower.py
python3 ./test.py -f 2-query/join.py
# python3 ./test.py -f 2-query/concat.py # after wal ,crash occured
# python3 ./test.py -f 2-query/concat_ws.py
python3 ./test.py -f 2-query/timezone.py
python3 ./test.py -f 2-query/Now.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册